flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [26/50] [abbrv] flink git commit: [FLINK-3384] [kafka] Add ClosableQueue for message exchanges between Kafka Threads
Date Fri, 12 Feb 2016 11:29:51 GMT
[FLINK-3384] [kafka] Add ClosableQueue for message exchanges between Kafka Threads


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd324ea7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd324ea7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd324ea7

Branch: refs/heads/tableOnCalcite
Commit: fd324ea72979cc3d4202ffa3ea174ec4cc9d153b
Parents: 50bd65a
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Feb 10 14:51:10 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 10 22:15:32 2016 +0100

----------------------------------------------------------------------
 .../kafka/internals/ClosableBlockingQueue.java  | 502 +++++++++++++++
 .../internals/ClosableBlockingQueueTest.java    | 603 +++++++++++++++++++
 2 files changed, 1105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd324ea7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
new file mode 100644
index 0000000..856c2ad
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A special form of blocking queue with two additions:
+ * <ol>
+ *     <li>The queue can be closed atomically when empty. Adding elements after the
queue
+ *         is closed fails. This allows queue consumers to atomically discover that no elements
+ *         are available and mark themselves as shut down.</li>
+ *     <li>The queue allows to poll batches of elements in one polling call.</li>
+ * </ol>
+ * 
+ * The queue has no capacity restriction and is safe for multiple producers and consumers.
+ * 
+ * <p>Note: Null elements are prohibited.
+ * 
+ * @param <E> The type of elements in the queue.
+ */
+public class ClosableBlockingQueue<E> {
+
+	/** The lock used to make queue accesses and open checks atomic */
+	private final ReentrantLock lock;
+	
+	/** The condition on which blocking get-calls wait if the queue is empty */
+	private final Condition nonEmpty;
+	
+	/** The deque of elements */
+	private final ArrayDeque<E> elements;
+	
+	/** Flag marking the status of the queue */
+	private volatile boolean open;
+	
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new empty queue.
+	 */
+	public ClosableBlockingQueue() {
+		this(10);
+	}
+
+	/**
+	 * Creates a new empty queue, reserving space for at least the specified number
+	 * of elements. The queu can still grow, of more elements are added than the
+	 * reserved space.
+	 * 
+	 * @param initialSize The number of elements to reserve space for.
+	 */
+	public ClosableBlockingQueue(int initialSize) {
+		this.lock = new ReentrantLock(true);
+		this.nonEmpty = this.lock.newCondition();
+		
+		this.elements = new ArrayDeque<>(initialSize);
+		this.open = true;
+		
+		
+	}
+
+	/**
+	 * Creates a new queue that contains the given elements.
+	 * 
+	 * @param initialElements The elements to initially add to the queue.
+	 */
+	public ClosableBlockingQueue(Collection<? extends E> initialElements) {
+		this(initialElements.size());
+		this.elements.addAll(initialElements);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Size and status
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the number of elements currently in the queue.
+	 * @return The number of elements currently in the queue.
+	 */
+	public int size() {
+		return elements.size();
+	}
+
+	/**
+	 * Checks whether the queue is empty (has no elements).
+	 * @return True, if the queue is empty; false, if it is non-empty.
+	 */
+	public boolean isEmpty() {
+		return size() == 0;
+	}
+
+	/**
+	 * Checks whether the queue is currently open, meaning elements can be added and polled.
+	 * @return True, if the queue is open; false, if it is closed.
+	 */
+	public boolean isOpen() {
+		return open;
+	}
+	
+	/**
+	 * Tries to close the queue. Closing the queue only succeeds when no elements are
+	 * in the queue when this method is called. Checking whether the queue is empty, and
+	 * marking the queue as closed is one atomic operation.
+	 *
+	 * @return True, if the queue is closed, false if the queue remains open.
+	 */
+	public boolean close() {
+		lock.lock();
+		try {
+			if (open) {
+				if (elements.isEmpty()) {
+					open = false;
+					nonEmpty.signalAll();
+					return true;
+				} else {
+					return false;
+				}
+			}
+			else {
+				// already closed
+				return true;
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Adding / Removing elements
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Tries to add an element to the queue, if the queue is still open. Checking whether the
queue
+	 * is open and adding the element is one atomic operation.
+	 * 
+	 * <p>Unlike the {@link #add(Object)} method, this method never throws an exception,
+	 * but only indicates via the return code if the element was added or the
+	 * queue was closed.
+	 * 
+	 * @param element The element to add.
+	 * @return True, if the element was added, false if the queue was closes.
+	 */
+	public boolean addIfOpen(E element) {
+		requireNonNull(element);
+		
+		lock.lock();
+		try {
+			if (open) {
+				elements.addLast(element);
+				if (elements.size() == 1) {
+					nonEmpty.signalAll();
+				}
+			}
+			return open;
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Adds the element to the queue, or fails with an exception, if the queue is closed.
+	 * Checking whether the queue is open and adding the element is one atomic operation.
+	 * 
+	 * @param element The element to add.
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 */
+	public void add(E element) throws IllegalStateException {
+		requireNonNull(element);
+
+		lock.lock();
+		try {
+			if (open) {
+				elements.addLast(element);
+				if (elements.size() == 1) {
+					nonEmpty.signalAll();
+				}
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns the queue's next element without removing it, if the queue is non-empty.
+	 * Otherwise, returns null. 
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and getting the next element is one atomic operation.
+	 * 
+	 * <p>This method never blocks.
+	 * 
+	 * @return The queue's next element, or null, if the queue is empty.
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 */
+	public E peek() {
+		lock.lock();
+		try {
+			if (open) {
+				if (elements.size() > 0) {
+					return elements.getFirst();
+				} else {
+					return null;
+				}
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns the queue's next element and removes it, the queue is non-empty.
+	 * Otherwise, this method returns null. 
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 *
+	 * <p>This method never blocks.
+	 *
+	 * @return The queue's next element, or null, if the queue is empty.
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 */
+	public E poll() {
+		lock.lock();
+		try {
+			if (open) {
+				if (elements.size() > 0) {
+					return elements.removeFirst();
+				} else {
+					return null;
+				}
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns all of the queue's current elements in a list, if the queue is non-empty.
+	 * Otherwise, this method returns null. 
+	 *
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the elements is one atomic operation.
+	 *
+	 * <p>This method never blocks.
+	 *
+	 * @return All of the queue's elements, or null, if the queue is empty.
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 */
+	public List<E> pollBatch() {
+		lock.lock();
+		try {
+			if (open) {
+				if (elements.size() > 0) {
+					ArrayList<E> result = new ArrayList<>(elements);
+					elements.clear();
+					return result;
+				} else {
+					return null;
+				}
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns the next element in the queue. If the queue is empty, this method
+	 * waits until at least one element is added.
+	 * 
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 * 
+	 * @return The next element in the queue, never null.
+	 * 
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+	 *                              element to be added.
+	 */
+	public E getElementBlocking() throws InterruptedException {
+		lock.lock();
+		try {
+			while (open && elements.isEmpty()) {
+				nonEmpty.await();
+			}
+			
+			if (open) {
+				return elements.removeFirst();
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Returns the next element in the queue. If the queue is empty, this method
+	 * waits at most a certain time until an element becomes available. If no element
+	 * is available after that time, the method returns null.
+	 * 
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 * 
+	 * @param timeoutMillis The number of milliseconds to block, at most.
+	 * @return The next element in the queue, or null, if the timeout expires  before an element
is available.
+	 * 
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+	 *                              element to be added.
+	 */
+	public E getElementBlocking(long timeoutMillis) throws InterruptedException {
+		if (timeoutMillis == 0L) {
+			// wait forever case
+			return getElementBlocking();
+		} else if (timeoutMillis < 0L) {
+			throw new IllegalArgumentException("invalid timeout");
+		}
+		
+		final long deadline = System.currentTimeMillis() + timeoutMillis;
+		
+		lock.lock();
+		try {
+			while (open && elements.isEmpty() && timeoutMillis > 0) { 
+				nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
+				timeoutMillis = deadline - System.currentTimeMillis();
+			}
+			
+			if (!open) {
+				throw new IllegalStateException("queue is closed");
+			}
+			else if (elements.isEmpty()) {
+				return null;
+			} else {
+				return elements.removeFirst();
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Gets all the elements found in the list, or blocks until at least one element
+	 * was added. If the queue is empty when this method is called, it blocks until
+	 * at least one element is added.
+	 *
+	 * <p>This method always returns a list with at least one element.
+	 * 
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 * 
+	 * @return A list with all elements in the queue, always at least one element.
+	 * 
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+	 *                              element to be added.
+	 */
+	public List<E> getBatchBlocking() throws InterruptedException {
+		lock.lock();
+		try {
+			while (open && elements.isEmpty()) {
+				nonEmpty.await();
+			}
+			if (open) {
+				ArrayList<E> result = new ArrayList<>(elements);
+				elements.clear();
+				return result;
+			} else {
+				throw new IllegalStateException("queue is closed");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Gets all the elements found in the list, or blocks until at least one element
+	 * was added. This method is similar as {@link #getBatchBlocking()}, but takes
+	 * a number of milliseconds that the method will maximally wait before returning.
+	 * 
+	 * <p>This method never returns null, but an empty list, if the queue is empty when
+	 * the method is called and the request times out before an element was added.
+	 * 
+	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
+	 * Checking whether the queue is open and removing the next element is one atomic operation.
+	 * 
+	 * @param timeoutMillis The number of milliseconds to wait, at most.
+	 * @return A list with all elements in the queue, possible an empty list.
+	 *
+	 * @throws IllegalStateException Thrown, if the queue is closed.
+	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
+	 *                              element to be added.
+	 */
+	public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException {
+		if (timeoutMillis == 0L) {
+			// wait forever case
+			return getBatchBlocking();
+		} else if (timeoutMillis < 0L) {
+			throw new IllegalArgumentException("invalid timeout");
+		}
+
+		final long deadline = System.currentTimeMillis() + timeoutMillis;
+
+		lock.lock();
+		try {
+			while (open && elements.isEmpty() && timeoutMillis > 0) {
+				nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
+				timeoutMillis = deadline - System.currentTimeMillis();
+			}
+
+			if (!open) {
+				throw new IllegalStateException("queue is closed");
+			}
+			else if (elements.isEmpty()) {
+				return Collections.emptyList();
+			}
+			else {
+				ArrayList<E> result = new ArrayList<>(elements);
+				elements.clear();
+				return result;
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Standard Utilities
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		int hashCode = 17;
+		for (E element : elements) {
+			hashCode = 31 * hashCode + element.hashCode();
+		}
+		return hashCode;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		} else if (obj != null && obj.getClass() == ClosableBlockingQueue.class) {
+			@SuppressWarnings("unchecked")
+			ClosableBlockingQueue<E> that = (ClosableBlockingQueue<E>) obj;
+			
+			if (this.elements.size() == that.elements.size()) {
+				Iterator<E> thisElements = this.elements.iterator();
+				for (E thatNext : that.elements) {
+					E thisNext = thisElements.next();
+					if (!(thisNext == null ? thatNext == null : thisNext.equals(thatNext))) {
+						return false;
+					}
+				}
+				return true;
+			} else {
+				return false;
+			}
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return elements.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd324ea7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
new file mode 100644
index 0000000..6298c92
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+
+public class ClosableBlockingQueueTest {
+
+	// ------------------------------------------------------------------------
+	//  single-threaded unit tests
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testCreateQueueHashCodeEquals() {
+		try {
+			ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>();
+			ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>(22);
+
+			assertTrue(queue1.isOpen());
+			assertTrue(queue2.isOpen());
+			assertTrue(queue1.isEmpty());
+			assertTrue(queue2.isEmpty());
+			assertEquals(0, queue1.size());
+			assertEquals(0, queue2.size());
+			
+			assertTrue(queue1.hashCode() == queue2.hashCode());
+			//noinspection EqualsWithItself
+			assertTrue(queue1.equals(queue1));
+			//noinspection EqualsWithItself
+			assertTrue(queue2.equals(queue2));
+			assertTrue(queue1.equals(queue2));
+			
+			assertNotNull(queue1.toString());
+			assertNotNull(queue2.toString());
+
+			List<String> elements = new ArrayList<>();
+			elements.add("a");
+			elements.add("b");
+			elements.add("c");
+
+			ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>(elements);
+			ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>(asList("a",
"b", "c"));
+
+			assertTrue(queue3.isOpen());
+			assertTrue(queue4.isOpen());
+			assertFalse(queue3.isEmpty());
+			assertFalse(queue4.isEmpty());
+			assertEquals(3, queue3.size());
+			assertEquals(3, queue4.size());
+
+			assertTrue(queue3.hashCode() == queue4.hashCode());
+			//noinspection EqualsWithItself
+			assertTrue(queue3.equals(queue3));
+			//noinspection EqualsWithItself
+			assertTrue(queue4.equals(queue4));
+			assertTrue(queue3.equals(queue4));
+			
+			assertNotNull(queue3.toString());
+			assertNotNull(queue4.toString());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCloseEmptyQueue() {
+		try {
+			ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
+			assertTrue(queue.isOpen());
+			assertTrue(queue.close());
+			assertFalse(queue.isOpen());
+			
+			assertFalse(queue.addIfOpen("element"));
+			assertTrue(queue.isEmpty());
+			
+			try {
+				queue.add("some element");
+				fail("should cause an exception");
+			} catch (IllegalStateException ignored) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testCloseNonEmptyQueue() {
+		try {
+			ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>(asList(1,
2, 3));
+			assertTrue(queue.isOpen());
+			
+			assertFalse(queue.close());
+			assertFalse(queue.close());
+			
+			queue.poll();
+
+			assertFalse(queue.close());
+			assertFalse(queue.close());
+			
+			queue.pollBatch();
+
+			assertTrue(queue.close());
+			assertFalse(queue.isOpen());
+
+			assertFalse(queue.addIfOpen(42));
+			assertTrue(queue.isEmpty());
+
+			try {
+				queue.add(99);
+				fail("should cause an exception");
+			} catch (IllegalStateException ignored) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPeekAndPoll() {
+		try {
+			ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
+			
+			assertNull(queue.peek());
+			assertNull(queue.peek());
+			assertNull(queue.poll());
+			assertNull(queue.poll());
+			
+			assertEquals(0, queue.size());
+			
+			queue.add("a");
+			queue.add("b");
+			queue.add("c");
+
+			assertEquals(3, queue.size());
+			
+			assertEquals("a", queue.peek());
+			assertEquals("a", queue.peek());
+			assertEquals("a", queue.peek());
+
+			assertEquals(3, queue.size());
+			
+			assertEquals("a", queue.poll());
+			assertEquals("b", queue.poll());
+
+			assertEquals(1, queue.size());
+			
+			assertEquals("c", queue.peek());
+			assertEquals("c", queue.peek());
+
+			assertEquals("c", queue.poll());
+
+			assertEquals(0, queue.size());
+			assertNull(queue.poll());
+			assertNull(queue.peek());
+			assertNull(queue.peek());
+			
+			assertTrue(queue.close());
+			
+			try {
+				queue.peek();
+				fail("should cause an exception");
+			} catch (IllegalStateException ignored) {
+				// expected
+			}
+
+			try {
+				queue.poll();
+				fail("should cause an exception");
+			} catch (IllegalStateException ignored) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPollBatch() {
+		try {
+			ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
+
+			assertNull(queue.pollBatch());
+			
+			queue.add("a");
+			queue.add("b");
+			
+			assertEquals(asList("a", "b"), queue.pollBatch());
+			assertNull(queue.pollBatch());
+			
+			queue.add("c");
+
+			assertEquals(singletonList("c"), queue.pollBatch());
+			assertNull(queue.pollBatch());
+
+			assertTrue(queue.close());
+
+			try {
+				queue.pollBatch();
+				fail("should cause an exception");
+			} catch (IllegalStateException ignored) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testGetElementBlocking() {
+		try {
+			ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
+
+			assertNull(queue.getElementBlocking(1));
+			assertNull(queue.getElementBlocking(3));
+			assertNull(queue.getElementBlocking(2));
+
+			assertEquals(0, queue.size());
+
+			queue.add("a");
+			queue.add("b");
+			queue.add("c");
+			queue.add("d");
+			queue.add("e");
+			queue.add("f");
+
+			assertEquals(6, queue.size());
+
+			assertEquals("a", queue.getElementBlocking(99));
+			assertEquals("b", queue.getElementBlocking());
+
+			assertEquals(4, queue.size());
+
+			assertEquals("c", queue.getElementBlocking(0));
+			assertEquals("d", queue.getElementBlocking(1000000));
+			assertEquals("e", queue.getElementBlocking());
+			assertEquals("f", queue.getElementBlocking(1786598));
+
+			assertEquals(0, queue.size());
+
+			assertNull(queue.getElementBlocking(1));
+			assertNull(queue.getElementBlocking(3));
+			assertNull(queue.getElementBlocking(2));
+
+			assertTrue(queue.close());
+
+			try {
+				queue.getElementBlocking();
+				fail("should cause an exception");
+			} catch (IllegalStateException ignored) {
+				// expected
+			}
+
+			try {
+				queue.getElementBlocking(1000000000L);
+				fail("should cause an exception");
+			} catch (IllegalStateException ignored) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testGetBatchBlocking() {
+		try {
+			ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>();
+
+			assertEquals(emptyList(), queue.getBatchBlocking(1));
+			assertEquals(emptyList(), queue.getBatchBlocking(3));
+			assertEquals(emptyList(), queue.getBatchBlocking(2));
+
+			queue.add("a");
+			queue.add("b");
+
+			assertEquals(asList("a", "b"), queue.getBatchBlocking(900000009));
+
+			queue.add("c");
+			queue.add("d");
+
+			assertEquals(asList("c", "d"), queue.getBatchBlocking());
+
+			assertEquals(emptyList(), queue.getBatchBlocking(2));
+
+			queue.add("e");
+
+			assertEquals(singletonList("e"), queue.getBatchBlocking(0));
+
+			queue.add("f");
+
+			assertEquals(singletonList("f"), queue.getBatchBlocking(1000000000));
+
+			assertEquals(0, queue.size());
+
+			assertEquals(emptyList(), queue.getBatchBlocking(1));
+			assertEquals(emptyList(), queue.getBatchBlocking(3));
+			assertEquals(emptyList(), queue.getBatchBlocking(2));
+
+			assertTrue(queue.close());
+
+			try {
+				queue.getBatchBlocking();
+				fail("should cause an exception");
+			} catch (IllegalStateException ignored) {
+				// expected
+			}
+
+			try {
+				queue.getBatchBlocking(1000000000L);
+				fail("should cause an exception");
+			} catch (IllegalStateException ignored) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  multi-threaded tests
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void notifyOnClose() {
+		try {
+			final long oneYear = 365L * 24 * 60 * 60 * 1000;
+			
+			// test "getBatchBlocking()"
+			final ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>();
+			QueueCall call1 = new QueueCall() {
+				@Override
+				public void call() throws Exception {
+					queue1.getBatchBlocking();
+				}
+			};
+			testCallExitsOnClose(call1, queue1);
+
+			// test "getBatchBlocking()"
+			final ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>();
+			QueueCall call2 = new QueueCall() {
+				@Override
+				public void call() throws Exception {
+					queue2.getBatchBlocking(oneYear);
+				}
+			};
+			testCallExitsOnClose(call2, queue2);
+
+			// test "getBatchBlocking()"
+			final ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>();
+			QueueCall call3 = new QueueCall() {
+				@Override
+				public void call() throws Exception {
+					queue3.getElementBlocking();
+				}
+			};
+			testCallExitsOnClose(call3, queue3);
+
+			// test "getBatchBlocking()"
+			final ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>();
+			QueueCall call4 = new QueueCall() {
+				@Override
+				public void call() throws Exception {
+					queue4.getElementBlocking(oneYear);
+				}
+			};
+			testCallExitsOnClose(call4, queue4);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+	@Test
+	public void testMultiThreadedAddGet() {
+		try {
+			final ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>();
+			final AtomicReference<Throwable> pushErrorRef = new AtomicReference<>();
+			final AtomicReference<Throwable> pollErrorRef = new AtomicReference<>();
+			
+			final int numElements = 2000;
+			
+			Thread pusher = new Thread("pusher") {
+
+				@Override
+				public void run() {
+					try {
+						final Random rnd = new Random();
+						for (int i = 0; i < numElements; i++) {
+							queue.add(i);
+							
+							// sleep a bit, sometimes
+							int sleepTime = rnd.nextInt(3);
+							if (sleepTime > 1) {
+								Thread.sleep(sleepTime);
+							}
+						}
+						
+						while (true) {
+							if (queue.close()) {
+								break;
+							} else {
+								Thread.sleep(5);
+							}
+						}
+					} catch (Throwable t) {
+						pushErrorRef.set(t);
+					}
+				}
+			};
+			pusher.start();
+
+			Thread poller = new Thread("poller") {
+
+				@SuppressWarnings("InfiniteLoopStatement")
+				@Override
+				public void run() {
+					try {
+						int count = 0;
+						
+						try {
+							final Random rnd = new Random();
+							int nextExpected = 0;
+							
+							while (true) {
+								int getMethod = count % 7;
+								switch (getMethod) {
+									case 0: {
+										Integer next = queue.getElementBlocking(1);
+										if (next != null) {
+											assertEquals(nextExpected, next.intValue());
+											nextExpected++;
+											count++;
+										}
+										break;
+									}
+									case 1: {
+										List<Integer> nextList = queue.getBatchBlocking();
+										for (Integer next : nextList) {
+											assertNotNull(next);
+											assertEquals(nextExpected, next.intValue());
+											nextExpected++;
+											count++;
+										}
+										break;
+									}
+									case 2: {
+										List<Integer> nextList = queue.getBatchBlocking(1);
+										if (nextList != null) {
+											for (Integer next : nextList) {
+												assertNotNull(next);
+												assertEquals(nextExpected, next.intValue());
+												nextExpected++;
+												count++;
+											}
+										}
+										break;
+									}
+									case 3: {
+										Integer next = queue.poll();
+										if (next != null) {
+											assertEquals(nextExpected, next.intValue());
+											nextExpected++;
+											count++;
+										}
+										break;
+									}
+									case 4: {
+										List<Integer> nextList = queue.pollBatch();
+										if (nextList != null) {
+											for (Integer next : nextList) {
+												assertNotNull(next);
+												assertEquals(nextExpected, next.intValue());
+												nextExpected++;
+												count++;
+											}
+										}
+										break;
+									}
+									default: {
+										Integer next = queue.getElementBlocking();
+										assertNotNull(next);
+										assertEquals(nextExpected, next.intValue());
+										nextExpected++;
+										count++;
+									}
+								}
+								
+								// sleep a bit, sometimes
+								int sleepTime = rnd.nextInt(3);
+								if (sleepTime > 1) {
+									Thread.sleep(sleepTime);
+								}
+							}
+						} catch (IllegalStateException e) {
+							// we get this once the queue is closed
+							assertEquals(numElements, count);
+						}
+					} catch (Throwable t) {
+						pollErrorRef.set(t);
+					}
+				}
+			};
+			poller.start();
+			
+			pusher.join();
+			poller.join();
+			
+			if (pushErrorRef.get() != null) {
+				Throwable t = pushErrorRef.get();
+				t.printStackTrace();
+				fail("Error in pusher: " + t.getMessage());
+			}
+			if (pollErrorRef.get() != null) {
+				Throwable t = pollErrorRef.get();
+				t.printStackTrace();
+				fail("Error in poller: " + t.getMessage());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utils
+	// ------------------------------------------------------------------------
+	
+	private static void testCallExitsOnClose(
+			final QueueCall call, ClosableBlockingQueue<String> queue) throws Exception {
+		
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+		
+		Runnable runnable = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					call.call();
+				} catch (Throwable t) {
+					errorRef.set(t);
+				}
+			}
+		};
+
+		Thread thread = new Thread(runnable);
+		thread.start();
+		Thread.sleep(100);
+		queue.close();
+		thread.join();
+
+		@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+		Throwable cause = errorRef.get();
+		assertTrue(cause instanceof IllegalStateException);
+	}
+	
+	private interface QueueCall {
+		void call() throws Exception;
+	}
+}


Mime
View raw message