ratis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject incubator-ratis git commit: RATIS-402. Limit the data size in RaftLogWorker queue.
Date Fri, 07 Dec 2018 00:57:32 GMT
Repository: incubator-ratis
Updated Branches:
  refs/heads/master 74a3a7ce2 -> 3b0be0287


RATIS-402. Limit the data size in RaftLogWorker queue.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/3b0be028
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/3b0be028
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/3b0be028

Branch: refs/heads/master
Commit: 3b0be0287f2c15de90552a6a83b58251a0eab8e8
Parents: 74a3a7c
Author: Tsz Wo Nicholas Sze <szetszwo@apache.org>
Authored: Thu Dec 6 16:56:28 2018 -0800
Committer: Tsz Wo Nicholas Sze <szetszwo@apache.org>
Committed: Thu Dec 6 16:56:28 2018 -0800

----------------------------------------------------------------------
 .../apache/ratis/util/DataBlockingQueue.java    | 154 +++++++++++++++++++
 .../java/org/apache/ratis/util/DataQueue.java   |  12 +-
 .../ratis/server/RaftServerConfigKeys.java      |  23 ++-
 .../ratis/server/storage/RaftLogWorker.java     |  26 ++--
 .../ratis/server/storage/SegmentedRaftLog.java  |   4 +
 .../ratis/util/TestDataBlockingQueue.java       | 117 ++++++++++++++
 .../org/apache/ratis/util/TestDataQueue.java    |  48 +++---
 7 files changed, 349 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3b0be028/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
new file mode 100644
index 0000000..c71dea5
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedFunctionWithTimeout;
+import org.apache.ratis.util.function.TriConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.ToIntFunction;
+
+/**
+ * A queue for data elements
+ * such that the queue imposes limits on both number of elements and the data size in bytes.
+ *
+ * Null element is NOT supported.
+ *
+ * This class is threadsafe.
+ */
+public class DataBlockingQueue<E> extends DataQueue<E> {
+  public static final Logger LOG = LoggerFactory.getLogger(DataBlockingQueue.class);
+
+  private final Lock lock = new ReentrantLock();
+  private final Condition notFull  = lock.newCondition();
+  private final Condition notEmpty = lock.newCondition();
+
+  public DataBlockingQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToIntFunction<E>
getNumBytes) {
+    super(name, byteLimit, elementLimit, getNumBytes);
+  }
+
+  @Override
+  public int getNumBytes() {
+    try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+      return super.getNumBytes();
+    }
+  }
+
+  @Override
+  public int getNumElements() {
+    try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+      return super.getNumElements();
+    }
+  }
+
+  @Override
+  public void clear() {
+    try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+      super.clear();
+      notFull.signal();
+    }
+  }
+
+  @Override
+  public boolean offer(E element) {
+    Objects.requireNonNull(element, "element == null");
+    try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+      if (super.offer(element)) {
+        notEmpty.signal();
+        return true;
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Adds an element to this queue, waiting up to the given timeout.
+   *
+   * @return true if the element is added successfully;
+   *         otherwise, the element is not added, return false.
+   */
+  public boolean offer(E element, TimeDuration timeout) throws InterruptedException {
+    Objects.requireNonNull(element, "element == null");
+    long nanos = timeout.toLong(TimeUnit.NANOSECONDS);
+    try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+      for(;;) {
+        if (super.offer(element)) {
+          notEmpty.signal();
+          return true;
+        }
+        if (nanos <= 0) {
+          return false;
+        }
+        nanos = notFull.awaitNanos(nanos);
+      }
+
+    }
+  }
+
+  @Override
+  public E poll() {
+    try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+      final E polled = super.poll();
+      if (polled != null) {
+        notFull.signal();
+      }
+      return polled;
+    }
+  }
+
+  /**
+   * Poll out the head element from this queue, waiting up to the given timeout.
+   */
+  public E poll(TimeDuration timeout) throws InterruptedException {
+    long nanos = timeout.toLong(TimeUnit.NANOSECONDS);
+    try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+      for(;;) {
+        final E polled = super.poll();
+        if (polled != null) {
+          notFull.signal();
+          return polled;
+        }
+        if (nanos <= 0) {
+          return null;
+        }
+        nanos = notEmpty.awaitNanos(nanos);
+      }
+    }
+  }
+
+  @Override
+  public <RESULT, THROWABLE extends Throwable> List<RESULT> pollList(long timeoutMs,
+      CheckedFunctionWithTimeout<E, RESULT, THROWABLE> getResult,
+      TriConsumer<E, TimeDuration, TimeoutException> timeoutHandler) throws THROWABLE
{
+    try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+      final List<RESULT> results = super.pollList(timeoutMs, getResult, timeoutHandler);
+      if (!results.isEmpty()) {
+        notFull.signal();
+      }
+      return results;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3b0be028/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
index d7819cf..8d4ec99 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
@@ -61,6 +61,14 @@ public class DataQueue<E> {
     this.q = new ArrayDeque<>(elementLimit);
   }
 
+  public int getElementLimit() {
+    return elementLimit;
+  }
+
+  public int getByteLimit() {
+    return byteLimit;
+  }
+
   public int getNumBytes() {
     return numBytes;
   }
@@ -136,7 +144,9 @@ public class DataQueue<E> {
   /** Poll out the head element from this queue. */
   public E poll() {
     final E polled = q.poll();
-    numBytes -= getNumBytes.applyAsInt(polled);
+    if (polled != null) {
+      numBytes -= getNumBytes.applyAsInt(polled);
+    }
     return polled;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3b0be028/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 32f5752..ab8e50f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -114,13 +114,24 @@ public interface RaftServerConfigKeys {
       setBoolean(properties::setBoolean, USE_MEMORY_KEY, useMemory);
     }
 
-    String QUEUE_SIZE_KEY = PREFIX + ".queue.size";
-    int QUEUE_SIZE_DEFAULT = 4096;
-    static int queueSize(RaftProperties properties) {
-      return getInt(properties::getInt, QUEUE_SIZE_KEY, QUEUE_SIZE_DEFAULT, getDefaultLog(),
requireMin(1));
+    String QUEUE_ELEMENT_LIMIT_KEY = PREFIX + ".queue.element-limit";
+    int QUEUE_ELEMENT_LIMIT_DEFAULT = 4096;
+    static int queueElementLimit(RaftProperties properties) {
+      return getInt(properties::getInt, QUEUE_ELEMENT_LIMIT_KEY, QUEUE_ELEMENT_LIMIT_DEFAULT,
getDefaultLog(),
+          requireMin(1));
     }
-    static void setQueueSize(RaftProperties properties, int queueSize) {
-      setInt(properties::setInt, QUEUE_SIZE_KEY, queueSize, requireMin(1));
+    static void setElementLimit(RaftProperties properties, int queueSize) {
+      setInt(properties::setInt, QUEUE_ELEMENT_LIMIT_KEY, queueSize, requireMin(1));
+    }
+
+    String QUEUE_BYTE_LIMIT_KEY = PREFIX + ".queue.byte-limit";
+    SizeInBytes QUEUE_BYTE_LIMIT_DEFAULT = SizeInBytes.valueOf("64MB");
+    static SizeInBytes queueByteLimit(RaftProperties properties) {
+      return getSizeInBytes(properties::getSizeInBytes,
+          QUEUE_BYTE_LIMIT_KEY, QUEUE_BYTE_LIMIT_DEFAULT, getDefaultLog());
+    }
+    static void setByteLimit(RaftProperties properties, int queueSize) {
+      setInt(properties::setInt, QUEUE_BYTE_LIMIT_KEY, queueSize, requireMin(1));
     }
 
     String SEGMENT_SIZE_MAX_KEY = PREFIX + ".segment.size.max";

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3b0be028/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index da8daa9..ef5611c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -39,8 +39,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -52,6 +50,8 @@ import java.util.function.Supplier;
 class RaftLogWorker implements Runnable {
   static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class);
 
+  static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, TimeUnit.SECONDS);
+
   static class StateMachineDataPolicy {
     private final boolean sync;
     private final TimeDuration syncTimeout;
@@ -89,7 +89,7 @@ class RaftLogWorker implements Runnable {
   /**
    * The task queue accessed by rpc handler threads and the io worker thread.
    */
-  private final BlockingQueue<Task> queue;
+  private final DataBlockingQueue<Task> queue;
   private volatile boolean running = true;
   private final Thread workerThread;
 
@@ -126,7 +126,11 @@ class RaftLogWorker implements Runnable {
     this.stateMachine = stateMachine;
 
     this.storage = storage;
-    this.queue = new ArrayBlockingQueue<>(RaftServerConfigKeys.Log.queueSize(properties));
+
+    final SizeInBytes queueByteLimit = RaftServerConfigKeys.Log.queueByteLimit(properties);
+    final int queueElementLimit = RaftServerConfigKeys.Log.queueElementLimit(properties);
+    this.queue = new DataBlockingQueue<>(name, queueByteLimit, queueElementLimit, Task::getSerializedSize);
+
     this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
     this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
     this.bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
@@ -186,10 +190,9 @@ class RaftLogWorker implements Runnable {
   private Task addIOTask(Task task) {
     LOG.debug("{} adds IO task {}", name, task);
     try {
-      if (!queue.offer(task, 1, TimeUnit.SECONDS)) {
+      for(; !queue.offer(task, ONE_SECOND); ) {
         Preconditions.assertTrue(isAlive(),
             "the worker thread is not alive");
-        queue.put(task);
       }
     } catch (Throwable t) {
       if (t instanceof InterruptedException && !running) {
@@ -210,7 +213,7 @@ class RaftLogWorker implements Runnable {
   public void run() {
     while (running) {
       try {
-        Task task = queue.poll(1, TimeUnit.SECONDS);
+        Task task = queue.poll(ONE_SECOND);
         if (task != null) {
           try {
             task.execute();
@@ -231,7 +234,7 @@ class RaftLogWorker implements Runnable {
               Thread.currentThread().getName());
         }
         LOG.info(Thread.currentThread().getName()
-            + " was interrupted, exiting. There are " + queue.size()
+            + " was interrupted, exiting. There are " + queue.getNumElements()
             + " tasks remaining in the queue.");
         Thread.currentThread().interrupt();
         return;
@@ -336,6 +339,11 @@ class RaftLogWorker implements Runnable {
     }
 
     @Override
+    int getSerializedSize() {
+      return ServerProtoUtils.getSerializedSize(entry);
+    }
+
+    @Override
     CompletableFuture<Long> getFuture() {
       return combined;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3b0be028/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index f5a7330..d2b579d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -88,6 +88,10 @@ public class SegmentedRaftLog extends RaftLog {
 
     abstract long getEndIndex();
 
+    int getSerializedSize() {
+      return 0;
+    }
+
     @Override
     public String toString() {
       return getClass().getSimpleName() + ":" + getEndIndex();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3b0be028/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java b/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
new file mode 100644
index 0000000..d7fc520
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestDataBlockingQueue {
+  static final Logger LOG = LoggerFactory.getLogger(TestDataBlockingQueue.class);
+
+  final SizeInBytes byteLimit = SizeInBytes.valueOf(100);
+  final int elementLimit = 10;
+  final DataBlockingQueue<Integer> q = new DataBlockingQueue<>(null, byteLimit,
elementLimit, Integer::intValue);
+
+  final TimeDuration slow = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+  final TimeDuration fast = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);
+
+  @Test(timeout = 1000)
+  public void testElementLimit() {
+    TestDataQueue.runTestElementLimit(q);
+  }
+
+  @Test(timeout = 1000)
+  public void testByteLimit() {
+    TestDataQueue.runTestByteLimit(q);
+  }
+
+  @Test(timeout = 10_000)
+  public void testSlowOfferFastPoll() throws Exception {
+    runTestBlockingCalls(slow, fast, q);
+  }
+
+  @Test(timeout = 10_000)
+  public void testFastOfferSlowPoll() throws Exception {
+    runTestBlockingCalls(fast, slow, q);
+  }
+
+  static void assertOfferPull(int offering, int polled, int elementLimit) {
+    Assert.assertTrue(offering >= polled);
+    Assert.assertTrue(offering - polled <= elementLimit + 1);
+  }
+
+  static void runTestBlockingCalls(TimeDuration offerSleepTime, TimeDuration pollSleepTime,
+      DataBlockingQueue<Integer> q) throws Exception {
+    Assert.assertTrue(q.isEmpty());
+    ExitUtils.disableSystemExit();
+    final int elementLimit = q.getElementLimit();
+    final TimeDuration timeout = CollectionUtils.min(offerSleepTime, pollSleepTime);
+
+    final AtomicInteger offeringValue = new AtomicInteger();
+    final AtomicInteger polledValue = new AtomicInteger();
+    final int endValue = 30;
+
+    final Thread pollThread = new Thread(() -> {
+      try {
+        for(; polledValue.get() < endValue;) {
+          pollSleepTime.sleep();
+          final Integer polled = q.poll(timeout);
+          if (polled != null) {
+            Assert.assertEquals(polledValue.incrementAndGet(), polled.intValue());
+            LOG.info("polled {}", polled);
+          }
+          assertOfferPull(offeringValue.get(), polledValue.get(), elementLimit);
+        }
+      } catch (Throwable t) {
+        ExitUtils.terminate(-2, "pollThread failed", t, null);
+      }
+    });
+
+    final Thread offerThread = new Thread(() -> {
+      try {
+        for(offeringValue.incrementAndGet(); offeringValue.get() <= endValue; ) {
+          offerSleepTime.sleep();
+          final boolean offered = q.offer(offeringValue.get(), timeout);
+          if (offered) {
+            LOG.info("offered {}", offeringValue.getAndIncrement());
+          }
+          assertOfferPull(offeringValue.get(), polledValue.get(), elementLimit);
+        }
+      } catch (Throwable t) {
+        ExitUtils.terminate(-1, "offerThread failed", t, null);
+      }
+    });
+
+    pollThread.start();
+    offerThread.start();
+
+    offerThread.join();
+    pollThread.join();
+
+    Assert.assertEquals(endValue + 1, offeringValue.get());
+    Assert.assertEquals(endValue, polledValue.get());
+
+    Assert.assertTrue(q.isEmpty());
+    ExitUtils.assertNotTerminated();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3b0be028/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
index e465a1d..08bc36d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
@@ -33,7 +33,7 @@ public class TestDataQueue {
     };
   }
 
-  private void assertSizes(int expectedNumElements, int expectedNumBytes) {
+  static void assertSizes(int expectedNumElements, int expectedNumBytes, DataQueue<?>
q) {
     Assert.assertEquals(expectedNumElements, q.getNumElements());
     Assert.assertEquals(expectedNumBytes, q.getNumBytes());
   }
@@ -44,8 +44,13 @@ public class TestDataQueue {
 
   @Test(timeout = 1000)
   public void testElementLimit() {
-    assertSizes(0, 0);
+    runTestElementLimit(q);
+  }
+
+  static void runTestElementLimit(DataQueue<Integer> q) {
+    assertSizes(0, 0, q);
 
+    final int elementLimit = q.getElementLimit();
     int numBytes = 0;
     for (int i = 0; i < elementLimit; i++) {
       Assert.assertEquals(i, q.getNumElements());
@@ -53,12 +58,12 @@ public class TestDataQueue {
       final boolean offered = q.offer(i);
       Assert.assertTrue(offered);
       numBytes += i;
-      assertSizes(i+1, numBytes);
+      assertSizes(i+1, numBytes, q);
     }
     {
       final boolean offered = q.offer(0);
       Assert.assertFalse(offered);
-      assertSizes(elementLimit, numBytes);
+      assertSizes(elementLimit, numBytes, q);
     }
 
     { // poll all elements
@@ -68,48 +73,53 @@ public class TestDataQueue {
         Assert.assertEquals(i, polled.get(i).intValue());
       }
     }
-    assertSizes(0, 0);
+    assertSizes(0, 0, q);
   }
 
   @Test(timeout = 1000)
   public void testByteLimit() {
-    assertSizes(0, 0);
+    runTestByteLimit(q);
+  }
+
+  static void runTestByteLimit(DataQueue<Integer> q) {
+    assertSizes(0, 0, q);
 
+    final int byteLimit = q.getByteLimit();
     try {
-      q.offer(byteLimit.getSizeInt() + 1);
+      q.offer(byteLimit + 1);
       Assert.fail();
     } catch (IllegalStateException ignored) {
     }
 
-    final int halfBytes = byteLimit.getSizeInt() / 2;
+    final int halfBytes = byteLimit / 2;
     {
       final boolean offered = q.offer(halfBytes);
       Assert.assertTrue(offered);
-      assertSizes(1, halfBytes);
+      assertSizes(1, halfBytes, q);
     }
 
     {
       final boolean offered = q.offer(halfBytes + 1);
       Assert.assertFalse(offered);
-      assertSizes(1, halfBytes);
+      assertSizes(1, halfBytes, q);
     }
 
     {
       final boolean offered = q.offer(halfBytes);
       Assert.assertTrue(offered);
-      assertSizes(2, byteLimit.getSizeInt());
+      assertSizes(2, byteLimit, q);
     }
 
     {
       final boolean offered = q.offer(1);
       Assert.assertFalse(offered);
-      assertSizes(2, byteLimit.getSizeInt());
+      assertSizes(2, byteLimit, q);
     }
 
     {
       final boolean offered = q.offer(0);
       Assert.assertTrue(offered);
-      assertSizes(3, byteLimit.getSizeInt());
+      assertSizes(3, byteLimit, q);
     }
 
     { // poll all elements
@@ -120,12 +130,12 @@ public class TestDataQueue {
       Assert.assertEquals(0, polled.get(2).intValue());
     }
 
-    assertSizes(0, 0);
+    assertSizes(0, 0, q);
   }
 
   @Test(timeout = 1000)
   public void testTimeout() {
-    assertSizes(0, 0);
+    assertSizes(0, 0, q);
 
     int numBytes = 0;
     for (int i = 0; i < elementLimit; i++) {
@@ -134,13 +144,13 @@ public class TestDataQueue {
       final boolean offered = q.offer(i);
       Assert.assertTrue(offered);
       numBytes += i;
-      assertSizes(i+1, numBytes);
+      assertSizes(i+1, numBytes, q);
     }
 
     { // poll with zero time
       final List<Integer> polled = q.pollList(0, (i, timeout) -> i, getTimeoutHandler(false));
       Assert.assertTrue(polled.isEmpty());
-      assertSizes(elementLimit, numBytes);
+      assertSizes(elementLimit, numBytes, q);
     }
 
     final int halfElements = elementLimit / 2;
@@ -157,7 +167,7 @@ public class TestDataQueue {
         Assert.assertEquals(i, polled.get(i).intValue());
         numBytes -= i;
       }
-      assertSizes(elementLimit - halfElements, numBytes);
+      assertSizes(elementLimit - halfElements, numBytes, q);
     }
 
     { // poll the remaining elements
@@ -167,6 +177,6 @@ public class TestDataQueue {
         Assert.assertEquals(halfElements + i, polled.get(i).intValue());
       }
     }
-    assertSizes(0, 0);
+    assertSizes(0, 0, q);
   }
 }


Mime
View raw message