Repository: hbase
Updated Branches:
refs/heads/master 91a7bbd58 -> 9a94dc90b
HBASE-16642 Use DelayQueue instead of TimeoutBlockingQueue
Signed-off-by: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a94dc90
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a94dc90
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a94dc90
Branch: refs/heads/master
Commit: 9a94dc90b4a53ca9d82c662f19c0369d3cd9aecf
Parents: 91a7bbd
Author: Hiroshi Ikeda <ikeda@vic.co.jp>
Authored: Thu Oct 13 20:21:46 2016 -0700
Committer: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Committed: Thu Oct 13 21:41:54 2016 -0700
----------------------------------------------------------------------
.../hbase/procedure2/ProcedureExecutor.java | 93 +++++---
.../procedure2/util/TimeoutBlockingQueue.java | 234 -------------------
.../util/TestTimeoutBlockingQueue.java | 159 -------------
3 files changed, 67 insertions(+), 419 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9a94dc90/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 2e9e3a3..14fe71b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -29,12 +29,15 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
@@ -48,8 +51,6 @@ import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
-import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -96,17 +97,58 @@ public class ProcedureExecutor<TEnvironment> {
}
/**
- * Used by the TimeoutBlockingQueue to get the timeout interval of the procedure
+ * Used by the DelayQueue to get the timeout interval of the procedure
*/
- private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure>
{
+ private static class DelayedContainer implements Delayed {
+ static final DelayedContainer POISON = new DelayedContainer();
+
+ /** null if poison */
+ final Procedure proc;
+ final long timeoutTime;
+
+ DelayedContainer(Procedure proc) {
+ assert proc != null;
+ this.proc = proc;
+ this.timeoutTime = proc.getLastUpdate() + proc.getTimeout();
+ }
+
+ DelayedContainer() {
+ this.proc = null;
+ this.timeoutTime = Long.MIN_VALUE;
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ if (currentTime >= timeoutTime) {
+ return 0;
+ }
+ return unit.convert(timeoutTime - currentTime, TimeUnit.MICROSECONDS);
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws ClassCastException {@inheritDoc}
+ */
+ @Override
+ public int compareTo(Delayed o) {
+ return Long.compare(timeoutTime, ((DelayedContainer)o).timeoutTime);
+ }
+
@Override
- public long getTimeout(Procedure proc) {
- return proc.getTimeRemaining();
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (! (obj instanceof DelayedContainer)) {
+ return false;
+ }
+ return Objects.equals(proc, ((DelayedContainer)obj).proc);
}
@Override
- public TimeUnit getTimeUnit(Procedure proc) {
- return TimeUnit.MILLISECONDS;
+ public int hashCode() {
+ return proc != null ? proc.hashCode() : 0;
}
}
@@ -239,8 +281,8 @@ public class ProcedureExecutor<TEnvironment> {
* Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
* or periodic procedures.
*/
- private final TimeoutBlockingQueue<Procedure> waitingTimeout =
- new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
+ private final DelayQueue<DelayedContainer> waitingTimeout =
+ new DelayQueue<DelayedContainer>();
/**
* Scheduler/Queue that contains runnable procedures.
@@ -544,7 +586,7 @@ public class ProcedureExecutor<TEnvironment> {
LOG.info("Stopping the procedure executor");
scheduler.stop();
- waitingTimeout.signalAll();
+ waitingTimeout.add(DelayedContainer.POISON);
}
public void join() {
@@ -628,7 +670,7 @@ public class ProcedureExecutor<TEnvironment> {
*/
public void addChore(final ProcedureInMemoryChore chore) {
chore.setState(ProcedureState.RUNNABLE);
- waitingTimeout.add(chore);
+ waitingTimeout.add(new DelayedContainer(chore));
}
/**
@@ -638,7 +680,7 @@ public class ProcedureExecutor<TEnvironment> {
*/
public boolean removeChore(final ProcedureInMemoryChore chore) {
chore.setState(ProcedureState.FINISHED);
- return waitingTimeout.remove(chore);
+ return waitingTimeout.remove(new DelayedContainer(chore));
}
/**
@@ -927,15 +969,16 @@ public class ProcedureExecutor<TEnvironment> {
private void timeoutLoop() {
while (isRunning()) {
- Procedure proc = waitingTimeout.poll();
- if (proc == null) continue;
-
- if (proc.getTimeRemaining() > 100) {
- // got an early wake, maybe a stop?
- // re-enqueue the task in case was not a stop or just a signal
- waitingTimeout.add(proc);
+ Procedure proc;
+ try {
+ proc = waitingTimeout.take().proc;
+ } catch (InterruptedException e) {
+ // Just consume the interruption.
continue;
}
+ if (proc == null) { // POISON to stop
+ break;
+ }
// ----------------------------------------------------------------------------
// TODO-MAYBE: Should we provide a notification to the store with the
@@ -955,8 +998,8 @@ public class ProcedureExecutor<TEnvironment> {
} catch (Throwable e) {
LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(),
e);
}
- proc.setStartTime(EnvironmentEdgeManager.currentTime());
- if (proc.isRunnable()) waitingTimeout.add(proc);
+ proc.updateTimestamp();
+ if (proc.isRunnable()) waitingTimeout.add(new DelayedContainer(proc));
}
continue;
}
@@ -970,8 +1013,6 @@ public class ProcedureExecutor<TEnvironment> {
store.update(proc);
scheduler.addFront(proc);
continue;
- } else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
- waitingTimeout.add(proc);
}
}
}
@@ -1171,7 +1212,7 @@ public class ProcedureExecutor<TEnvironment> {
procedure.setState(ProcedureState.WAITING);
break;
case WAITING_TIMEOUT:
- waitingTimeout.add(procedure);
+ waitingTimeout.add(new DelayedContainer(procedure));
break;
default:
break;
@@ -1179,7 +1220,7 @@ public class ProcedureExecutor<TEnvironment> {
}
}
} else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
- waitingTimeout.add(procedure);
+ waitingTimeout.add(new DelayedContainer(procedure));
} else if (!isSuspended) {
// No subtask, so we are done
procedure.setState(ProcedureState.FINISHED);
http://git-wip-us.apache.org/repos/asf/hbase/blob/9a94dc90/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
deleted file mode 100644
index 2292e63..0000000
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2.util;
-
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class TimeoutBlockingQueue<E> {
- public static interface TimeoutRetriever<T> {
- long getTimeout(T object);
- TimeUnit getTimeUnit(T object);
- }
-
- private final ReentrantLock lock = new ReentrantLock();
- private final Condition waitCond = lock.newCondition();
- private final TimeoutRetriever<? super E> timeoutRetriever;
-
- private E[] objects;
- private int head = 0;
- private int tail = 0;
-
- public TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever) {
- this(32, timeoutRetriever);
- }
-
- @SuppressWarnings("unchecked")
- public TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> timeoutRetriever)
{
- this.objects = (E[])new Object[capacity];
- this.timeoutRetriever = timeoutRetriever;
- }
-
- public void dump() {
- for (int i = 0; i < objects.length; ++i) {
- if (i == head) {
- System.out.print("[" + objects[i] + "] ");
- } else if (i == tail) {
- System.out.print("]" + objects[i] + "[ ");
- } else {
- System.out.print(objects[i] + " ");
- }
- }
- System.out.println();
- }
-
- public void clear() {
- lock.lock();
- try {
- if (head != tail) {
- for (int i = head; i < tail; ++i) {
- objects[i] = null;
- }
- head = 0;
- tail = 0;
- waitCond.signal();
- }
- } finally {
- lock.unlock();
- }
- }
-
- public void add(E e) {
- if (e == null) throw new NullPointerException();
-
- lock.lock();
- try {
- addElement(e);
- waitCond.signal();
- } finally {
- lock.unlock();
- }
- }
-
- public boolean remove(E e) {
- if (e == null) return false;
- lock.lock();
- try {
- for (int i = 0; i < objects.length; ++i) {
- if (e.equals(objects[i])) {
- objects[i] = null;
- return true;
- }
- }
- return false;
- } finally {
- lock.unlock();
- }
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
- public E poll() {
- lock.lock();
- try {
- if (isEmpty()) {
- waitCond.await();
- return null;
- }
-
- E elem = objects[head];
- long nanos = getNanosTimeout(elem);
- nanos = waitCond.awaitNanos(nanos);
- return nanos > 0 ? null : removeFirst();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return null;
- } finally {
- lock.unlock();
- }
- }
-
- public int size() {
- return tail - head;
- }
-
- public boolean isEmpty() {
- return (tail - head) == 0;
- }
-
- public void signalAll() {
- lock.lock();
- try {
- waitCond.signalAll();
- } finally {
- lock.unlock();
- }
- }
-
- private void addElement(E elem) {
- int size = (tail - head);
- if ((objects.length - size) == 0) {
- int capacity = size + ((size < 64) ? (size + 2) : (size >> 1));
- E[] newObjects = (E[])new Object[capacity];
-
- if (compareTimeouts(objects[tail - 1], elem) <= 0) {
- // Append
- System.arraycopy(objects, head, newObjects, 0, tail);
- tail -= head;
- newObjects[tail++] = elem;
- } else if (compareTimeouts(objects[head], elem) > 0) {
- // Prepend
- System.arraycopy(objects, head, newObjects, 1, tail);
- newObjects[0] = elem;
- tail -= (head - 1);
- } else {
- // Insert in the middle
- int index = upperBound(head, tail - 1, elem);
- int newIndex = (index - head);
- System.arraycopy(objects, head, newObjects, 0, newIndex);
- newObjects[newIndex] = elem;
- System.arraycopy(objects, index, newObjects, newIndex + 1, tail - index);
- tail -= (head - 1);
- }
- head = 0;
- objects = newObjects;
- } else {
- if (tail == objects.length) {
- // shift down |-----AAAAAAA|
- tail -= head;
- System.arraycopy(objects, head, objects, 0, tail);
- head = 0;
- }
-
- if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) {
- // Append
- objects[tail++] = elem;
- } else if (head > 0 && compareTimeouts(objects[head], elem) > 0) {
- // Prepend
- objects[--head] = elem;
- } else {
- // Insert in the middle
- int index = upperBound(head, tail - 1, elem);
- System.arraycopy(objects, index, objects, index + 1, tail - index);
- objects[index] = elem;
- tail++;
- }
- }
- }
-
- private E removeFirst() {
- E elem = objects[head];
- objects[head] = null;
- head = (head + 1) % objects.length;
- if (head == 0) tail = 0;
- return elem;
- }
-
- private int upperBound(int start, int end, E key) {
- while (start < end) {
- int mid = (start + end) >>> 1;
- E mitem = objects[mid];
- int cmp = compareTimeouts(mitem, key);
- if (cmp > 0) {
- end = mid;
- } else {
- start = mid + 1;
- }
- }
- return start;
- }
-
- private int compareTimeouts(final E a, final E b) {
- long t1 = getNanosTimeout(a);
- long t2 = getNanosTimeout(b);
- return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0;
- }
-
- private long getNanosTimeout(final E obj) {
- if (obj == null) return 0;
- TimeUnit unit = timeoutRetriever.getTimeUnit(obj);
- long timeout = timeoutRetriever.getTimeout(obj);
- return unit.toNanos(timeout);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9a94dc90/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
deleted file mode 100644
index 1f901b5..0000000
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2.util;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.hbase.CategoryBasedTimeout;
-import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestRule;
-
-@Category({MasterTests.class, MediumTests.class})
-public class TestTimeoutBlockingQueue {
- @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
- withLookingForStuckThread(true).build();
- static class TestObject {
- private long timeout;
- private int seqId;
-
- public TestObject(int seqId, long timeout) {
- this.timeout = timeout;
- this.seqId = seqId;
- }
-
- public long getTimeout() {
- return timeout;
- }
-
- public String toString() {
- return String.format("(%03d, %03d)", seqId, timeout);
- }
- }
-
- static class TestObjectTimeoutRetriever implements TimeoutRetriever<TestObject> {
- @Override
- public long getTimeout(TestObject obj) {
- return obj.getTimeout();
- }
-
- @Override
- public TimeUnit getTimeUnit(TestObject obj) {
- return TimeUnit.MILLISECONDS;
- }
- }
-
- @Test
- public void testOrder() {
- TimeoutBlockingQueue<TestObject> queue =
- new TimeoutBlockingQueue<TestObject>(8, new TestObjectTimeoutRetriever());
-
- long[] timeouts = new long[] {500, 200, 700, 300, 600, 600, 200, 800, 500};
-
- for (int i = 0; i < timeouts.length; ++i) {
- for (int j = 0; j <= i; ++j) {
- queue.add(new TestObject(j, timeouts[j]));
- queue.dump();
- }
-
- long prev = 0;
- for (int j = 0; j <= i; ++j) {
- TestObject obj = queue.poll();
- assertTrue(obj.getTimeout() >= prev);
- prev = obj.getTimeout();
- queue.dump();
- }
- }
- }
-
- @Test
- public void testTimeoutBlockingQueue() {
- TimeoutBlockingQueue<TestObject> queue;
-
- int[][] testArray = new int[][] {
- {200, 400, 600}, // append
- {200, 400, 100}, // prepend
- {200, 400, 300}, // insert
- };
-
- for (int i = 0; i < testArray.length; ++i) {
- int[] sortedArray = Arrays.copyOf(testArray[i], testArray[i].length);
- Arrays.sort(sortedArray);
-
- // test with head == 0
- queue = new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever());
- for (int j = 0; j < testArray[i].length; ++j) {
- queue.add(new TestObject(j, testArray[i][j]));
- queue.dump();
- }
-
- for (int j = 0; !queue.isEmpty(); ++j) {
- assertEquals(sortedArray[j], queue.poll().getTimeout());
- }
-
- queue = new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever());
- queue.add(new TestObject(0, 50));
- assertEquals(50, queue.poll().getTimeout());
-
- // test with head > 0
- for (int j = 0; j < testArray[i].length; ++j) {
- queue.add(new TestObject(j, testArray[i][j]));
- queue.dump();
- }
-
- for (int j = 0; !queue.isEmpty(); ++j) {
- assertEquals(sortedArray[j], queue.poll().getTimeout());
- }
- }
- }
-
- @Test
- public void testRemove() {
- TimeoutBlockingQueue<TestObject> queue =
- new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever());
-
- final int effectiveLen = 5;
- TestObject[] objs = new TestObject[6];
- for (int i = 0; i < effectiveLen; ++i) {
- objs[i] = new TestObject(0, i * 10);
- queue.add(objs[i]);
- }
- objs[effectiveLen] = new TestObject(0, effectiveLen * 10);
- queue.dump();
-
- for (int i = 0; i < effectiveLen; i += 2) {
- assertTrue(queue.remove(objs[i]));
- }
- assertTrue(!queue.remove(objs[effectiveLen]));
-
- for (int i = 0; i < effectiveLen; ++i) {
- TestObject x = queue.poll();
- assertEquals((i % 2) == 0 ? null : objs[i], x);
- }
- }
-}
|