commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ohe...@apache.org
Subject svn commit: r895466 - in /commons/proper/lang/trunk: pom.xml src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
Date Sun, 03 Jan 2010 19:04:08 GMT
Author: oheger
Date: Sun Jan  3 19:04:08 2010
New Revision: 895466

URL: http://svn.apache.org/viewvc?rev=895466&view=rev
Log:
[LANG-560] Added TimedSemaphore class to concurrent package.

Added:
    commons/proper/lang/trunk/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
  (with props)
    commons/proper/lang/trunk/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
  (with props)
Modified:
    commons/proper/lang/trunk/pom.xml

Modified: commons/proper/lang/trunk/pom.xml
URL: http://svn.apache.org/viewvc/commons/proper/lang/trunk/pom.xml?rev=895466&r1=895465&r2=895466&view=diff
==============================================================================
--- commons/proper/lang/trunk/pom.xml (original)
+++ commons/proper/lang/trunk/pom.xml Sun Jan  3 19:04:08 2010
@@ -415,7 +415,14 @@
       <version>4.7</version>
       <scope>test</scope>
     </dependency>
-  </dependencies> 
+
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <version>2.5.2</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

Added: commons/proper/lang/trunk/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
URL: http://svn.apache.org/viewvc/commons/proper/lang/trunk/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java?rev=895466&view=auto
==============================================================================
--- commons/proper/lang/trunk/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
(added)
+++ commons/proper/lang/trunk/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
Sun Jan  3 19:04:08 2010
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.lang3.concurrent;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ * A specialized <em>semaphore</em> implementation that provides a number of
+ * permits in a given time frame.
+ * </p>
+ * <p>
+ * This class is similar to the {@code java.util.concurrent.Semaphore} class
+ * provided by the JDK in that it manages a configurable number of permits.
+ * Using the {@link #acquire()} method a permit can be requested by a thread.
+ * However, there is an additional timing dimension: there is no {@code
+ * release()} method for freeing a permit, but all permits are automatically
+ * released at the end of a configurable time frame. If a thread calls
+ * {@link #acquire()} and the available permits are already exhausted for this
+ * time frame, the thread is blocked. When the time frame ends all permits
+ * requested so far are restored, and blocking threads are waked up again, so
+ * that they can try to acquire a new permit. This basically means that in the
+ * specified time frame only the given number of operations is possible.
+ * </p>
+ * <p>
+ * A use case for this class is to artificially limit the load produced by a
+ * process. As an example consider an application that issues database queries
+ * on a production system in a background process to gather statistical
+ * information. This background processing should not produce so much database
+ * load that the functionality and the performance of the production system are
+ * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that
+ * only a given number of database queries are issued per second.
+ * </p>
+ * <p>
+ * A thread class for performing database queries could look as follows:
+ *
+ * <pre>
+ * public class StatisticsThread extends Thread {
+ *     // The semaphore for limiting database load.
+ *     private final TimedSemaphore semaphore;
+ *     // Create an instance and set the semaphore
+ *     public StatisticsThread(TimedSemaphore timedSemaphore) {
+ *         semaphore = timedSemaphore;
+ *     }
+ *     // Gather statistics
+ *     public void run() {
+ *         try {
+ *             while(true) {
+ *                 semaphore.acquire();   // limit database load
+ *                 performQuery();        // issue a query
+ *             }
+ *         } catch(InterruptedException) {
+ *             // fall through
+ *         }
+ *     }
+ *     ...
+ * }
+ * </pre>
+ *
+ * The following code fragment shows how a {@code TimedSemaphore} is created
+ * that allows only 10 operations per second and passed to the statistics
+ * thread:
+ *
+ * <pre>
+ * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
+ * StatisticsThread thread = new StatisticsThread(sem);
+ * thread.start();
+ * </pre>
+ *
+ * </p>
+ * <p>
+ * When creating an instance the time period for the semaphore must be
+ * specified. {@code TimedSemaphore} uses an executor service with a
+ * corresponding period to monitor this interval. The {@code
+ * ScheduledExecutorService} to be used for this purpose can be provided at
+ * construction time. Alternatively the class creates an internal executor
+ * service.
+ * </p>
+ * <p>
+ * Client code that uses {@code TimedSemaphore} has to call the
+ * {@link #acquire()} method in aach processing step. {@code TimedSemaphore}
+ * keeps track of the number of invocations of the {@link #acquire()} method and
+ * blocks the calling thread if the counter exceeds the limit specified. When
+ * the timer signals the end of the time period the counter is reset and all
+ * waiting threads are released. Then another cycle can start.
+ * </p>
+ * <p>
+ * It is possible to modify the limit at any time using the
+ * {@link #setLimit(int)} method. This is useful if the load produced by an
+ * operation has to be adapted dynamically. In the example scenario with the
+ * thread collecting statistics it may make sense to specify a low limit during
+ * day time while allowing a higher load in the night time. Reducing the limit
+ * takes effect immediately by blocking incoming callers. If the limit is
+ * increased, waiting threads are not released immediately, but wake up when the
+ * timer runs out. Then, in the next period more processing steps can be
+ * performed without blocking. By setting the limit to 0 the semaphore can be
+ * switched off: in this mode the {@link #acquire()} method never blocks, but
+ * lets all callers pass directly.
+ * </p>
+ * <p>
+ * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()}
+ * method should be called. This causes the periodic task that monitors the time
+ * interval to be canceled. If the {@code ScheduledExecutorService} has been
+ * created by the semaphore at construction time, it is also shut down.
+ * resources. After that {@link #acquire()} must not be called any more.
+ * </p>
+ *
+ * @version $Id:$
+ */
+public class TimedSemaphore {
+    /**
+     * Constant for a value representing no limit. If the limit is set to a
+     * value less or equal this constant, the {@code TimedSemaphore} will be
+     * effectively switched off.
+     */
+    public static final int NO_LIMIT = 0;
+
+    /** Constant for the thread pool size for the executor. */
+    private static final int THREAD_POOL_SIZE = 1;
+
+    /** The executor service for managing the timer thread. */
+    private final ScheduledExecutorService executorService;
+
+    /** Stores the period for this timed semaphore. */
+    private final long period;
+
+    /** The time unit for the period. */
+    private final TimeUnit unit;
+
+    /** A flag whether the executor service was created by this object. */
+    private final boolean ownExecutor;
+
+    /** A future object representing the timer task. */
+    private ScheduledFuture<?> task;
+
+    /** Stores the total number of invocations of the acquire() method. */
+    private long totalAcquireCount;
+
+    /**
+     * The counter for the periods. This counter is increased every time a
+     * period ends.
+     */
+    private long periodCount;
+
+    /** The limit. */
+    private int limit;
+
+    /** The current counter. */
+    private int acquireCount;
+
+    /** The number of invocations of acquire() in the last period. */
+    private int lastCallsPerPeriod;
+
+    /** A flag whether shutdown() was called. */
+    private boolean shutdown;
+
+    /**
+     * Creates a new instance of {@link TimedSemaphore} and initializes it with
+     * the given time period and the limit.
+     *
+     * @param timePeriod the time period
+     * @param timeUnit the unit for the period
+     * @param limit the limit for the semaphore
+     * @throws IllegalArgumentException if the period is less or equals 0
+     */
+    public TimedSemaphore(long timePeriod, TimeUnit timeUnit, int limit) {
+        this(null, timePeriod, timeUnit, limit);
+    }
+
+    /**
+     * Creates a new instance of {@link TimedSemaphore} and initializes it with
+     * an executor service, the given time period, and the limit. The executor
+     * service will be used for creating a periodic task for monitoring the time
+     * period. It can be <b>null</b>, then a default service will be created.
+     *
+     * @param service the executor service
+     * @param timePeriod the time period
+     * @param timeUnit the unit for the period
+     * @param limit the limit for the semaphore
+     * @throws IllegalArgumentException if the period is less or equals 0
+     */
+    public TimedSemaphore(ScheduledExecutorService service, long timePeriod,
+            TimeUnit timeUnit, int limit) {
+        if (timePeriod <= 0) {
+            throw new IllegalArgumentException("Time period must be greater 0!");
+        }
+
+        period = timePeriod;
+        unit = timeUnit;
+
+        if (service != null) {
+            executorService = service;
+            ownExecutor = false;
+        } else {
+            ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
+                    THREAD_POOL_SIZE);
+            s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+            executorService = s;
+            ownExecutor = true;
+        }
+
+        setLimit(limit);
+    }
+
+    /**
+     * Returns the limit enforced by this semaphore. The limit determines how
+     * many invocations of {@link #acquire()} are allowed within the monitored
+     * period.
+     *
+     * @return the limit
+     */
+    public final synchronized int getLimit() {
+        return limit;
+    }
+
+    /**
+     * Sets the limit. This is the number of times the {@link #acquire()} method
+     * can be called within the time period specified. If this limit is reached,
+     * further invocations of {@link #acquire()} will block. Setting the limit
+     * to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled,
+     * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
+     * the time period.
+     *
+     * @param limit the limit
+     */
+    public final synchronized void setLimit(int limit) {
+        this.limit = limit;
+    }
+
+    /**
+     * Initializes a shutdown. After that the object cannot be used any more.
+     * This method can be invoked an arbitrary number of times. All invocations
+     * after the first one do not have any effect.
+     */
+    public synchronized void shutdown() {
+        if (!shutdown) {
+
+            if (ownExecutor) {
+                // if the executor was created by this instance, it has
+                // to be shutdown
+                getExecutorService().shutdownNow();
+            }
+            if (task != null) {
+                task.cancel(false);
+            }
+
+            shutdown = true;
+        }
+    }
+
+    /**
+     * Tests whether the {@link #shutdown()} method has been called on this
+     * object. If this method returns <b>true</b>, this instance cannot be used
+     * any longer.
+     *
+     * @return a flag whether a shutdown has been performed
+     */
+    public synchronized boolean isShutdown() {
+        return shutdown;
+    }
+
+    /**
+     * Tries to acquire a permit from this semaphore. This method will block if
+     * the limit for the current period has already been reached. If
+     * {@link #shutdown()} has already been invoked, calling this method will
+     * cause an exception. The very first call of this method starts the timer
+     * task which monitors the time period set for this {@code TimedSemaphore}.
+     * From now on the semaphore is active.
+     *
+     * @throws InterruptedException if the thread gets interrupted
+     * @throws IllegalStateException if this semaphore is already shut down
+     */
+    public synchronized void acquire() throws InterruptedException {
+        if (isShutdown()) {
+            throw new IllegalStateException("TimedSemaphore is shut down!");
+        }
+
+        if (task == null) {
+            task = startTimer();
+        }
+
+        boolean canPass = false;
+        do {
+            canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit();
+            if (!canPass) {
+                wait();
+            } else {
+                acquireCount++;
+            }
+        } while (!canPass);
+    }
+
+    /**
+     * Returns the number of (successful) acquire invocations during the last
+     * period. This is the number of times the {@link #acquire()} method was
+     * called without blocking. This can be useful for testing or debugging
+     * purposes or to determine a meaningful threshold value. If a limit is set,
+     * the value returned by this method won't be greater than this limit.
+     *
+     * @return the number of non-blocking invocations of the {@link #acquire()}
+     * method
+     */
+    public synchronized int getLastAcquiresPerPeriod() {
+        return lastCallsPerPeriod;
+    }
+
+    /**
+     * Returns the number of invocations of the {@link #acquire()} method for
+     * the current period. This may be useful for testing or debugging purposes.
+     *
+     * @return the current number of {@link #acquire()} invocations
+     */
+    public synchronized int getAcquireCount() {
+        return acquireCount;
+    }
+
+    /**
+     * Returns the number of calls to the {@link #acquire()} method that can
+     * still be performed in the current period without blocking. This method
+     * can give an indication whether it is safe to call the {@link #acquire()}
+     * method without risking to be suspended. However, there is no guarantee
+     * that a subsequent call to {@link #acquire()} actually is not-blocking
+     * because in the mean time other threads may have invoked the semaphore.
+     *
+     * @return the current number of available {@link #acquire()} calls in the
+     * current period
+     */
+    public synchronized int getAvailablePermits() {
+        return getLimit() - getAcquireCount();
+    }
+
+    /**
+     * Returns the average number of successful (i.e. non-blocking)
+     * {@link #acquire()} invocations for the entire life-time of this {@code
+     * TimedSemaphore}. This method can be used for instance for statistical
+     * calculations.
+     *
+     * @return the average number of {@link #acquire()} invocations per time
+     * unit
+     */
+    public synchronized double getAverageCallsPerPeriod() {
+        return (periodCount == 0) ? 0 : (double) totalAcquireCount
+                / (double) periodCount;
+    }
+
+    /**
+     * Returns the time period. This is the time monitored by this semaphore.
+     * Only a given number of invocations of the {@link #acquire()} method is
+     * possible in this period.
+     *
+     * @return the time period
+     */
+    public long getPeriod() {
+        return period;
+    }
+
+    /**
+     * Returns the time unit. This is the unit used by {@link #getPeriod()}.
+     *
+     * @return the time unit
+     */
+    public TimeUnit getUnit() {
+        return unit;
+    }
+
+    /**
+     * Returns the executor service used by this instance.
+     *
+     * @return the executor service
+     */
+    protected ScheduledExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    /**
+     * Starts the timer. This method is called when {@link #acquire()} is called
+     * for the first time. It schedules a task to be executed at fixed rate to
+     * monitor the time period specified.
+     *
+     * @return a future object representing the task scheduled
+     */
+    protected ScheduledFuture<?> startTimer() {
+        return getExecutorService().scheduleAtFixedRate(new Runnable() {
+            public void run() {
+                endOfPeriod();
+            }
+        }, getPeriod(), getPeriod(), getUnit());
+    }
+
+    /**
+     * The current time period is finished. This method is called by the timer
+     * used internally to monitor the time period. It resets the counter and
+     * releases the threads waiting for this barrier.
+     */
+    synchronized void endOfPeriod() {
+        lastCallsPerPeriod = acquireCount;
+        totalAcquireCount += acquireCount;
+        periodCount++;
+        acquireCount = 0;
+        notifyAll();
+    }
+}

Propchange: commons/proper/lang/trunk/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: commons/proper/lang/trunk/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: commons/proper/lang/trunk/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: commons/proper/lang/trunk/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
URL: http://svn.apache.org/viewvc/commons/proper/lang/trunk/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java?rev=895466&view=auto
==============================================================================
--- commons/proper/lang/trunk/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
(added)
+++ commons/proper/lang/trunk/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
Sun Jan  3 19:04:08 2010
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.lang3.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.concurrent.TimedSemaphore;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+/**
+ * Test class for TimedSemaphore.
+ *
+ * @version $Id$
+ */
+public class TimedSemaphoreTest {
+    /** Constant for the time period. */
+    private static final long PERIOD = 500;
+
+    /** Constant for the time unit. */
+    private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;
+
+    /** Constant for the default limit. */
+    private static final int LIMIT = 10;
+
+    /**
+     * Tests creating a new instance.
+     */
+    @Test
+    public void testInit() {
+        ScheduledExecutorService service = EasyMock
+                .createMock(ScheduledExecutorService.class);
+        EasyMock.replay(service);
+        TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT,
+                LIMIT);
+        EasyMock.verify(service);
+        assertEquals("Wrong service", service, semaphore.getExecutorService());
+        assertEquals("Wrong period", PERIOD, semaphore.getPeriod());
+        assertEquals("Wrong unit", UNIT, semaphore.getUnit());
+        assertEquals("Statistic available", 0, semaphore
+                .getLastAcquiresPerPeriod());
+        assertEquals("Average available", 0.0, semaphore
+                .getAverageCallsPerPeriod(), .05);
+        assertFalse("Already shutdown", semaphore.isShutdown());
+        assertEquals("Wrong limit", LIMIT, semaphore.getLimit());
+    }
+
+    /**
+     * Tries to create an instance with a negative period. This should cause an
+     * exception.
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void testInitInvalidPeriod() {
+        new TimedSemaphore(0L, UNIT, LIMIT);
+    }
+
+    /**
+     * Tests whether a default executor service is created if no service is
+     * provided.
+     */
+    @Test
+    public void testInitDefaultService() {
+        TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT);
+        ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) semaphore
+                .getExecutorService();
+        assertFalse("Wrong periodic task policy", exec
+                .getContinueExistingPeriodicTasksAfterShutdownPolicy());
+        assertFalse("Wrong delayed task policy", exec
+                .getExecuteExistingDelayedTasksAfterShutdownPolicy());
+        assertFalse("Already shutdown", exec.isShutdown());
+        semaphore.shutdown();
+    }
+
+    /**
+     * Tests starting the timer.
+     */
+    @Test
+    public void testStartTimer() throws InterruptedException {
+        TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(PERIOD,
+                UNIT, LIMIT);
+        ScheduledFuture<?> future = semaphore.startTimer();
+        assertNotNull("No future returned", future);
+        Thread.sleep(PERIOD);
+        final int trials = 10;
+        int count = 0;
+        do {
+            Thread.sleep(PERIOD);
+            if (count++ > trials) {
+                fail("endOfPeriod() not called!");
+            }
+        } while (semaphore.getPeriodEnds() <= 0);
+        semaphore.shutdown();
+    }
+
+    /**
+     * Tests the shutdown() method if the executor belongs to the semaphore. In
+     * this case it has to be shut down.
+     */
+    @Test
+    public void testShutdownOwnExecutor() {
+        TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT);
+        semaphore.shutdown();
+        assertTrue("Not shutdown", semaphore.isShutdown());
+        assertTrue("Executor not shutdown", semaphore.getExecutorService()
+                .isShutdown());
+    }
+
+    /**
+     * Tests the shutdown() method for a shared executor service before a task
+     * was started. This should do pretty much nothing.
+     */
+    @Test
+    public void testShutdownSharedExecutorNoTask() {
+        ScheduledExecutorService service = EasyMock
+                .createMock(ScheduledExecutorService.class);
+        EasyMock.replay(service);
+        TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT,
+                LIMIT);
+        semaphore.shutdown();
+        assertTrue("Not shutdown", semaphore.isShutdown());
+        EasyMock.verify(service);
+    }
+
+    /**
+     * Prepares an executor service mock to expect the start of the timer.
+     *
+     * @param service the mock
+     * @param future the future
+     */
+    private void prepareStartTimer(ScheduledExecutorService service,
+            ScheduledFuture<?> future) {
+        service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock
+                .eq(PERIOD), EasyMock.eq(PERIOD), EasyMock.eq(UNIT));
+        EasyMock.expectLastCall().andReturn(future);
+    }
+
+    /**
+     * Tests the shutdown() method for a shared executor after the task was
+     * started. In this case the task must be canceled.
+     */
+    @Test
+    public void testShutdownSharedExecutorTask() throws InterruptedException {
+        ScheduledExecutorService service = EasyMock
+                .createMock(ScheduledExecutorService.class);
+        ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+        prepareStartTimer(service, future);
+        EasyMock.expect(future.cancel(false)).andReturn(true);
+        EasyMock.replay(service, future);
+        TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
+                PERIOD, UNIT, LIMIT);
+        semaphore.acquire();
+        semaphore.shutdown();
+        assertTrue("Not shutdown", semaphore.isShutdown());
+        EasyMock.verify(service, future);
+    }
+
+    /**
+     * Tests multiple invocations of the shutdown() method.
+     */
+    @Test
+    public void testShutdownMultipleTimes() throws InterruptedException {
+        ScheduledExecutorService service = EasyMock
+                .createMock(ScheduledExecutorService.class);
+        ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+        prepareStartTimer(service, future);
+        EasyMock.expect(future.cancel(false)).andReturn(true);
+        EasyMock.replay(service, future);
+        TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
+                PERIOD, UNIT, LIMIT);
+        semaphore.acquire();
+        for (int i = 0; i < 10; i++) {
+            semaphore.shutdown();
+        }
+        EasyMock.verify(service, future);
+    }
+
+    /**
+     * Tests the acquire() method if a limit is set.
+     */
+    @Test
+    public void testAcquireLimit() throws InterruptedException {
+        ScheduledExecutorService service = EasyMock
+                .createMock(ScheduledExecutorService.class);
+        ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+        prepareStartTimer(service, future);
+        EasyMock.replay(service, future);
+        final int count = 10;
+        CountDownLatch latch = new CountDownLatch(count - 1);
+        TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT, 1);
+        SemaphoreThread t = new SemaphoreThread(semaphore, latch, count,
+                count - 1);
+        semaphore.setLimit(count - 1);
+
+        // start a thread that calls the semaphore count times
+        t.start();
+        latch.await();
+        // now the semaphore's limit should be reached and the thread blocked
+        assertEquals("Wrong semaphore count", count - 1, semaphore
+                .getAcquireCount());
+
+        // this wakes up the thread, it should call the semaphore once more
+        semaphore.endOfPeriod();
+        t.join();
+        assertEquals("Wrong semaphore count (2)", 1, semaphore
+                .getAcquireCount());
+        assertEquals("Wrong acquire() count", count - 1, semaphore
+                .getLastAcquiresPerPeriod());
+        EasyMock.verify(service, future);
+    }
+
+    /**
+     * Tests the acquire() method if more threads are involved than the limit.
+     * This method starts a number of threads that all invoke the semaphore. The
+     * semaphore's limit is set to 1, so in each period only a single thread can
+     * acquire the semaphore.
+     */
+    @Test
+    public void testAcquireMultipleThreads() throws InterruptedException {
+        ScheduledExecutorService service = EasyMock
+                .createMock(ScheduledExecutorService.class);
+        ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+        prepareStartTimer(service, future);
+        EasyMock.replay(service, future);
+        TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
+                PERIOD, UNIT, 1);
+        semaphore.latch = new CountDownLatch(1);
+        final int count = 10;
+        SemaphoreThread[] threads = new SemaphoreThread[count];
+        for (int i = 0; i < count; i++) {
+            threads[i] = new SemaphoreThread(semaphore, null, 1, 0);
+            threads[i].start();
+        }
+        for (int i = 0; i < count; i++) {
+            semaphore.latch.await();
+            assertEquals("Wrong count", 1, semaphore.getAcquireCount());
+            semaphore.latch = new CountDownLatch(1);
+            semaphore.endOfPeriod();
+            assertEquals("Wrong acquire count", 1, semaphore
+                    .getLastAcquiresPerPeriod());
+        }
+        for (int i = 0; i < count; i++) {
+            threads[i].join();
+        }
+        EasyMock.verify(service, future);
+    }
+
+    /**
+     * Tests the acquire() method if no limit is set. A test thread is started
+     * that calls the semaphore a large number of times. Even if the semaphore's
+     * period does not end, the thread should never block.
+     */
+    @Test
+    public void testAcquireNoLimit() throws InterruptedException {
+        ScheduledExecutorService service = EasyMock
+                .createMock(ScheduledExecutorService.class);
+        ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+        prepareStartTimer(service, future);
+        EasyMock.replay(service, future);
+        TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
+                PERIOD, UNIT, TimedSemaphore.NO_LIMIT);
+        final int count = 1000;
+        CountDownLatch latch = new CountDownLatch(count);
+        SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
+        t.start();
+        latch.await();
+        EasyMock.verify(service, future);
+    }
+
+    /**
+     * Tries to call acquire() after shutdown(). This should cause an exception.
+     */
+    @Test(expected = IllegalStateException.class)
+    public void testPassAfterShutdown() throws InterruptedException {
+        TimedSemaphore semaphore = new TimedSemaphore(PERIOD, UNIT, LIMIT);
+        semaphore.shutdown();
+        semaphore.acquire();
+    }
+
+    /**
+     * Tests a bigger number of invocations that span multiple periods. The
+     * period is set to a very short time. A background thread calls the
+     * semaphore a large number of times. While it runs at last one end of a
+     * period should be reached.
+     */
+    @Test
+    public void testAcquireMultiplePeriods() throws InterruptedException {
+        final int count = 1000;
+        TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(
+                PERIOD / 10, TimeUnit.MILLISECONDS, 1);
+        semaphore.setLimit(count / 4);
+        CountDownLatch latch = new CountDownLatch(count);
+        SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
+        t.start();
+        latch.await();
+        semaphore.shutdown();
+        assertTrue("End of period not reached", semaphore.getPeriodEnds() > 0);
+    }
+
+    /**
+     * Tests the methods for statistics.
+     */
+    @Test
+    public void testGetAverageCallsPerPeriod() throws InterruptedException {
+        ScheduledExecutorService service = EasyMock
+                .createMock(ScheduledExecutorService.class);
+        ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+        prepareStartTimer(service, future);
+        EasyMock.replay(service, future);
+        TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT,
+                LIMIT);
+        semaphore.acquire();
+        semaphore.endOfPeriod();
+        assertEquals("Wrong average (1)", 1.0, semaphore
+                .getAverageCallsPerPeriod(), .005);
+        semaphore.acquire();
+        semaphore.acquire();
+        semaphore.endOfPeriod();
+        assertEquals("Wrong average (2)", 1.5, semaphore
+                .getAverageCallsPerPeriod(), .005);
+        EasyMock.verify(service, future);
+    }
+
+    /**
+     * Tests whether the available non-blocking calls can be queried.
+     */
+    @Test
+    public void testGetAvailablePermits() throws InterruptedException {
+        ScheduledExecutorService service = EasyMock
+                .createMock(ScheduledExecutorService.class);
+        ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
+        prepareStartTimer(service, future);
+        EasyMock.replay(service, future);
+        TimedSemaphore semaphore = new TimedSemaphore(service, PERIOD, UNIT,
+                LIMIT);
+        for (int i = 0; i < LIMIT; i++) {
+            assertEquals("Wrong available count at " + i, LIMIT - i, semaphore
+                    .getAvailablePermits());
+            semaphore.acquire();
+        }
+        semaphore.endOfPeriod();
+        assertEquals("Wrong available count in new period", LIMIT, semaphore
+                .getAvailablePermits());
+        EasyMock.verify(service, future);
+    }
+
+    /**
+     * A specialized implementation of {@code TimedSemaphore} that is easier to
+     * test.
+     */
+    private static class TimedSemaphoreTestImpl extends TimedSemaphore {
+        /** A mock scheduled future. */
+        ScheduledFuture<?> schedFuture;
+
+        /** A latch for synchronizing with the main thread. */
+        volatile CountDownLatch latch;
+
+        /** Counter for the endOfPeriod() invocations. */
+        private int periodEnds;
+
+        public TimedSemaphoreTestImpl(long timePeriod, TimeUnit timeUnit,
+                int limit) {
+            super(timePeriod, timeUnit, limit);
+        }
+
+        public TimedSemaphoreTestImpl(ScheduledExecutorService service,
+                long timePeriod, TimeUnit timeUnit, int limit) {
+            super(service, timePeriod, timeUnit, limit);
+        }
+
+        /**
+         * Returns the number of invocations of the endOfPeriod() method.
+         *
+         * @return the endOfPeriod() invocations
+         */
+        public int getPeriodEnds() {
+            synchronized (this) {
+                return periodEnds;
+            }
+        }
+
+        /**
+         * Invokes the latch if one is set.
+         */
+        @Override
+        public void acquire() throws InterruptedException {
+            super.acquire();
+            if (latch != null) {
+                latch.countDown();
+            }
+        }
+
+        /**
+         * Counts the number of invocations.
+         */
+        @Override
+        protected void endOfPeriod() {
+            super.endOfPeriod();
+            synchronized (this) {
+                periodEnds++;
+            }
+        }
+
+        /**
+         * Either returns the mock future or calls the super method.
+         */
+        @Override
+        protected ScheduledFuture<?> startTimer() {
+            return (schedFuture != null) ? schedFuture : super.startTimer();
+        }
+    }
+
+    /**
+     * A test thread class that will be used by tests for triggering the
+     * semaphore. The thread calls the semaphore a configurable number of times.
+     * When this is done, it can notify the main thread.
+     */
+    private static class SemaphoreThread extends Thread {
+        /** The semaphore. */
+        private final TimedSemaphore semaphore;
+
+        /** A latch for communication with the main thread. */
+        private final CountDownLatch latch;
+
+        /** The number of acquire() calls. */
+        private final int count;
+
+        /** The number of invocations of the latch. */
+        private final int latchCount;
+
+        public SemaphoreThread(TimedSemaphore b, CountDownLatch l, int c, int lc) {
+            semaphore = b;
+            latch = l;
+            count = c;
+            latchCount = lc;
+        }
+
+        /**
+         * Calls acquire() on the semaphore for the specified number of times.
+         * Optionally the latch will also be triggered to synchronize with the
+         * main test thread.
+         */
+        @Override
+        public void run() {
+            try {
+                for (int i = 0; i < count; i++) {
+                    semaphore.acquire();
+
+                    if (i < latchCount) {
+                        latch.countDown();
+                    }
+                }
+            } catch (InterruptedException iex) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+}

Propchange: commons/proper/lang/trunk/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: commons/proper/lang/trunk/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: commons/proper/lang/trunk/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message