hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject svn commit: r1593854 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java
Date Sun, 11 May 2014 20:36:51 GMT
Author: mbertozzi
Date: Sun May 11 20:36:50 2014
New Revision: 1593854

URL: http://svn.apache.org/r1593854
Log:
HBASE-11139 BoundedPriorityBlockingQueue#poll() should check the return value from awaitNanos()
(Shengzhe Yao)

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java?rev=1593854&r1=1593853&r2=1593854&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
Sun May 11 20:36:50 2014
@@ -245,7 +245,7 @@ public class BoundedPriorityBlockingQueu
     E result = null;
     try {
       while (queue.size() == 0 && nanos > 0) {
-        notEmpty.awaitNanos(nanos);
+        nanos = notEmpty.awaitNanos(nanos);
       }
       if (queue.size() > 0) {
         result = queue.poll();

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java?rev=1593854&r1=1593853&r2=1593854&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java
Sun May 11 20:36:50 2014
@@ -18,10 +18,16 @@
 package org.apache.hadoop.hbase.util;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 
 import java.util.Comparator;
+import java.util.PriorityQueue;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.SmallTests;
@@ -175,4 +181,56 @@ public class TestBoundedPriorityBlocking
     }
     assertEquals(null, queue.poll());
   }
+
+  @Test
+  public void testPoll() {
+    assertNull(queue.poll());
+    PriorityQueue<TestObject> testList = new PriorityQueue<TestObject>(CAPACITY,
new TestObjectComparator());
+
+    for (int i = 0; i < CAPACITY; ++i) {
+      TestObject obj = new TestObject(i, i);
+      testList.add(obj);
+      queue.offer(obj);
+    }
+
+    for (int i = 0; i < CAPACITY; ++i) {
+      assertEquals(testList.poll(), queue.poll());
+    }
+
+    assertNull(null, queue.poll());
+  }
+
+  @Test(timeout=10000)
+  public void testPollInExecutor() throws InterruptedException {
+    final TestObject testObj = new TestObject(0, 0);
+
+    final CyclicBarrier threadsStarted = new CyclicBarrier(2);
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+    executor.execute(new Runnable() {
+      public void run() {
+        try {
+          assertNull(queue.poll(1000, TimeUnit.MILLISECONDS));
+          threadsStarted.await();
+          assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS));
+          assertTrue(queue.isEmpty());
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    executor.execute(new Runnable() {
+      public void run() {
+        try {
+            threadsStarted.await();
+            queue.offer(testObj);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    executor.shutdown();
+    assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS));
+  }
 }



Mime
View raw message