commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brit...@apache.org
Subject [03/12] [lang] [LANG-1275] Added a tryAcquire() method to TimedSemaphore.
Date Thu, 13 Oct 2016 19:18:09 GMT
[LANG-1275] Added a tryAcquire() method to TimedSemaphore.

This method supports non-blocking use cases.


Project: http://git-wip-us.apache.org/repos/asf/commons-lang/repo
Commit: http://git-wip-us.apache.org/repos/asf/commons-lang/commit/809e2bed
Tree: http://git-wip-us.apache.org/repos/asf/commons-lang/tree/809e2bed
Diff: http://git-wip-us.apache.org/repos/asf/commons-lang/diff/809e2bed

Branch: refs/heads/release
Commit: 809e2bed22f02264e0ac58bba070f6d84386cfc5
Parents: 496506d
Author: oheger <oliver.heger@oliver-heger.de>
Authored: Sat Oct 8 16:47:10 2016 +0200
Committer: oheger <oliver.heger@oliver-heger.de>
Committed: Sat Oct 8 16:47:10 2016 +0200

----------------------------------------------------------------------
 .../lang3/concurrent/TimedSemaphore.java        | 77 ++++++++++++++++----
 .../lang3/concurrent/TimedSemaphoreTest.java    | 67 +++++++++++++++++
 2 files changed, 129 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/commons-lang/blob/809e2bed/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java b/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
index 9e47b16..a3517bd 100644
--- a/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
+++ b/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
@@ -16,13 +16,13 @@
  */
 package org.apache.commons.lang3.concurrent;
 
+import org.apache.commons.lang3.Validate;
+
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang3.Validate;
-
 /**
  * <p>
  * A specialized <em>semaphore</em> implementation that provides a number of
@@ -99,13 +99,22 @@ import org.apache.commons.lang3.Validate;
  * </p>
  * <p>
  * Client code that uses {@code TimedSemaphore} has to call the
- * {@link #acquire()} method in aach processing step. {@code TimedSemaphore}
+ * {@link #acquire()} method in each processing step. {@code TimedSemaphore}
  * keeps track of the number of invocations of the {@link #acquire()} method and
  * blocks the calling thread if the counter exceeds the limit specified. When
  * the timer signals the end of the time period the counter is reset and all
  * waiting threads are released. Then another cycle can start.
  * </p>
  * <p>
+ * An alternative to {@code acquire()} is the {@link #tryAcquire()} method. This
+ * method checks whether the semaphore is under the specified limit and
+ * increases the internal counter if this is the case. The return value is then
+ * <strong>true</strong>, and the calling thread can continue with its action.
+ * If the semaphore is already at its limit, {@code tryAcquire()} immediately
+ * returns <strong>false</strong> without blocking; the calling thread must
+ * then abort its action. This usage scenario prevents blocking of threads.
+ * </p>
+ * <p>
  * It is possible to modify the limit at any time using the
  * {@link #setLimit(int)} method. This is useful if the load produced by an
  * operation has to be adapted dynamically. In the example scenario with the
@@ -280,7 +289,7 @@ public class TimedSemaphore {
     }
 
     /**
-     * Tries to acquire a permit from this semaphore. This method will block if
+     * Acquires a permit from this semaphore. This method will block if
      * the limit for the current period has already been reached. If
      * {@link #shutdown()} has already been invoked, calling this method will
      * cause an exception. The very first call of this method starts the timer
@@ -291,26 +300,34 @@ public class TimedSemaphore {
      * @throws IllegalStateException if this semaphore is already shut down
      */
     public synchronized void acquire() throws InterruptedException {
-        if (isShutdown()) {
-            throw new IllegalStateException("TimedSemaphore is shut down!");
-        }
+        prepareAcquire();
 
-        if (task == null) {
-            task = startTimer();
-        }
-
-        boolean canPass = false;
+        boolean canPass;
         do {
-            canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit();
+            canPass = acquirePermit();
             if (!canPass) {
                 wait();
-            } else {
-                acquireCount++;
             }
         } while (!canPass);
     }
 
     /**
+     * Tries to acquire a permit from this semaphore. If the limit of this semaphore has
+     * not yet been reached, a permit is acquired, and this method returns
+     * <strong>true</strong>. Otherwise, this method returns immediately with
the result
+     * <strong>false</strong>.
+     *
+     * @return <strong>true</strong> if a permit could be acquired; <strong>false</strong>
+     * otherwise
+     * @throws IllegalStateException if this semaphore is already shut down
+     * @since 3.5
+     */
+    public synchronized boolean tryAcquire() {
+        prepareAcquire();
+        return acquirePermit();
+    }
+
+    /**
      * Returns the number of (successful) acquire invocations during the last
      * period. This is the number of times the {@link #acquire()} method was
      * called without blocking. This can be useful for testing or debugging
@@ -420,4 +437,34 @@ public class TimedSemaphore {
         acquireCount = 0;
         notifyAll();
     }
+
+    /**
+     * Prepares an acquire operation. Checks for the current state and starts the internal
+     * timer if necessary. This method must be called with the lock of this object held.
+     */
+    private void prepareAcquire() {
+        if (isShutdown()) {
+            throw new IllegalStateException("TimedSemaphore is shut down!");
+        }
+
+        if (task == null) {
+            task = startTimer();
+        }
+    }
+
+    /**
+     * Internal helper method for acquiring a permit. This method checks whether currently
+     * a permit can be acquired and - if so - increases the internal counter. The return
+     * value indicates whether a permit could be acquired. This method must be called with
+     * the lock of this object held.
+     *
+     * @return a flag whether a permit could be acquired
+     */
+    private boolean acquirePermit() {
+        if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) {
+            acquireCount++;
+            return true;
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/commons-lang/blob/809e2bed/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java b/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
index 067043d..3511021 100644
--- a/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
+++ b/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
@@ -385,6 +385,42 @@ public class TimedSemaphoreTest {
     }
 
     /**
+     * Tests the tryAcquire() method. It is checked whether the semaphore can be acquired
+     * by a bunch of threads the expected number of times and not more.
+     */
+    @Test
+    public void testTryAcquire() throws InterruptedException {
+        final TimedSemaphore semaphore = new TimedSemaphore(PERIOD, TimeUnit.SECONDS,
+                LIMIT);
+        TryAcquireThread[] threads = new TryAcquireThread[3 * LIMIT];
+        CountDownLatch latch = new CountDownLatch(1);
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new TryAcquireThread(semaphore, latch);
+            threads[i].start();
+        }
+
+        latch.countDown();
+        int permits = 0;
+        for (TryAcquireThread t : threads) {
+            t.join();
+            if (t.acquired) {
+                permits++;
+            }
+        }
+        assertEquals("Wrong number of permits granted", LIMIT, permits);
+    }
+
+    /**
+     * Tries to call tryAcquire() after shutdown(). This should cause an exception.
+     */
+    @Test(expected = IllegalStateException.class)
+    public void testTryAcquireAfterShutdown() {
+        final TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT);
+        semaphore.shutdown();
+        semaphore.tryAcquire();
+    }
+
+    /**
      * A specialized implementation of {@code TimedSemaphore} that is easier to
      * test.
      */
@@ -495,4 +531,35 @@ public class TimedSemaphoreTest {
             }
         }
     }
+
+    /**
+     * A test thread class which invokes {@code tryAcquire()} on the test semaphore and
+     * records the return value.
+     */
+    private static class TryAcquireThread extends Thread {
+        /** The semaphore. */
+        private final TimedSemaphore semaphore;
+
+        /** A latch for communication with the main thread. */
+        private final CountDownLatch latch;
+
+        /** Flag whether a permit could be acquired. */
+        private boolean acquired;
+
+        public TryAcquireThread(TimedSemaphore s, CountDownLatch l) {
+            semaphore = s;
+            latch = l;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (latch.await(10, TimeUnit.SECONDS)) {
+                    acquired = semaphore.tryAcquire();
+                }
+            } catch (InterruptedException iex) {
+                // ignore
+            }
+        }
+    }
 }


Mime
View raw message