Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Thu May 20 12:02:10 2010 @@ -17,7 +17,6 @@ package org.apache.activemq.store.amq; import java.io.File; - import org.apache.activemq.kaha.impl.async.AsyncDataManager; import org.apache.activemq.kaha.impl.index.hash.HashIndex; import org.apache.activemq.store.PersistenceAdapter; @@ -35,7 +34,6 @@ import org.apache.activemq.util.IOHelper */ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { static final int DEFAULT_MAX_REFERNCE_FILE_LENGTH=2*1024*1024; - private TaskRunnerFactory taskRunnerFactory; private File dataDirectory; private int journalThreadPriority = Thread.MAX_PRIORITY; private String brokerName = "localhost"; @@ -56,6 +54,7 @@ public class AMQPersistenceAdapterFactor private boolean forceRecoverReferenceStore=false; private long checkpointInterval = 1000 * 20; private boolean useDedicatedTaskRunner; + private TaskRunnerFactory taskRunnerFactory; /** @@ -82,6 +81,8 @@ public class AMQPersistenceAdapterFactor result.setMaxReferenceFileLength(getMaxReferenceFileLength()); result.setForceRecoverReferenceStore(isForceRecoverReferenceStore()); result.setRecoverReferenceStore(isRecoverReferenceStore()); + result.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); + result.setJournalThreadPriority(getJournalThreadPriority()); return result; } @@ -122,10 +123,6 @@ public class AMQPersistenceAdapterFactor * @return the taskRunnerFactory */ public TaskRunnerFactory getTaskRunnerFactory() { - if (taskRunnerFactory == null) { - taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority, - true, 1000, isUseDedicatedTaskRunner()); - } return taskRunnerFactory; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Thu May 20 12:02:10 2010 @@ -31,7 +31,6 @@ import java.util.concurrent.ThreadFactor import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activeio.journal.InvalidRecordLocationException; import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.JournalEventListener; @@ -64,9 +63,9 @@ import org.apache.activemq.thread.Schedu import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; -import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.wireformat.WireFormat; @@ -85,7 +84,7 @@ public class JournalPersistenceAdapter i private BrokerService brokerService; - protected static final Scheduler scheduler = Scheduler.getInstance(); + protected Scheduler scheduler; private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class); private Journal journal; @@ -97,20 +96,20 @@ public class JournalPersistenceAdapter i private final ConcurrentHashMap topics = new ConcurrentHashMap(); private SystemUsage usageManager; - private long checkpointInterval = 1000 * 60 * 5; + private final long checkpointInterval = 1000 * 60 * 5; private long lastCheckpointRequest = System.currentTimeMillis(); private long lastCleanup = System.currentTimeMillis(); private int maxCheckpointWorkers = 10; private int maxCheckpointMessageAddSize = 1024 * 1024; - private JournalTransactionStore transactionStore = new JournalTransactionStore(this); + private final JournalTransactionStore transactionStore = new JournalTransactionStore(this); private ThreadPoolExecutor checkpointExecutor; private TaskRunner checkpointTask; private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); private boolean fullCheckPoint; - private AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean started = new AtomicBoolean(false); private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); @@ -267,7 +266,9 @@ public class JournalPersistenceAdapter i recover(); // Do a checkpoint periodically. - scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10); + this.scheduler = new Scheduler("Journal Scheduler"); + this.scheduler.start(); + this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10); } @@ -278,7 +279,8 @@ public class JournalPersistenceAdapter i return; } - scheduler.cancel(periodicCheckpointTask); + this.scheduler.cancel(periodicCheckpointTask); + this.scheduler.stop(); // Take one final checkpoint and stop checkpoint processing. checkpoint(true, true); @@ -723,6 +725,7 @@ public class JournalPersistenceAdapter i longTermPersistence.setBrokerName(brokerName); } + @Override public String toString() { return "JournalPersistenceAdapator(" + longTermPersistence + ")"; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Thu May 20 12:02:10 2010 @@ -415,6 +415,7 @@ public class KahaDBPersistenceAdapter im @Override public String toString() { - return "KahaDBPersistenceAdapter"; + String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET"; + return "KahaDBPersistenceAdapter[" + path +"]" ; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu May 20 12:02:10 2010 @@ -72,6 +72,7 @@ import org.apache.activemq.store.kahadb. import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.store.kahadb.data.KahaXATransactionId; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.ServiceStopper; @@ -94,6 +95,7 @@ public class KahaDBStore extends Message private boolean concurrentStoreAndDispatchQueues = true; private boolean concurrentStoreAndDispatchTopics = true; private int maxAsyncJobs = MAX_ASYNC_JOBS; + private Scheduler scheduler; public KahaDBStore() { @@ -155,6 +157,7 @@ public class KahaDBStore extends Message @Override public void doStart() throws Exception { + super.doStart(); this.queueSemaphore = new Semaphore(getMaxAsyncJobs()); this.topicSemaphore = new Semaphore(getMaxAsyncJobs()); this.asyncQueueJobQueue = new LinkedBlockingQueue(getMaxAsyncJobs()); @@ -175,8 +178,6 @@ public class KahaDBStore extends Message return thread; } }); - super.doStart(); - } @Override @@ -204,6 +205,7 @@ public class KahaDBStore extends Message protected void addQueueTask(StoreQueueTask task) throws IOException { try { this.queueSemaphore.acquire(); + } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } @@ -327,7 +329,6 @@ public class KahaDBStore extends Message org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); - store(command, isEnableJournalDiskSyncs() && message.isResponseRequired()); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu May 20 12:02:10 2010 @@ -82,7 +82,7 @@ import org.apache.kahadb.util.VariableMa public class MessageDatabase extends ServiceSupport implements BrokerServiceAware { - private BrokerService brokerService; + protected BrokerService brokerService; public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500")); @@ -245,7 +245,6 @@ public class MessageDatabase extends Ser // to see if we need to exit this thread. long sleepTime = Math.min(checkpointInterval, 500); while (opened.get()) { - Thread.sleep(sleepTime); long now = System.currentTimeMillis(); if( now - lastCleanup >= cleanupInterval ) { @@ -276,9 +275,7 @@ public class MessageDatabase extends Ser public void open() throws IOException { if( opened.compareAndSet(false, true) ) { getJournal().start(); - - loadPageFile(); - + loadPageFile(); startCheckpoint(); recover(); } @@ -332,6 +329,11 @@ public class MessageDatabase extends Ser public void close() throws IOException, InterruptedException { if( opened.compareAndSet(true, false)) { synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + checkpointUpdate(tx, true); + } + }); pageFile.unload(); metadata = new Metadata(); } @@ -385,11 +387,12 @@ public class MessageDatabase extends Ser */ private void recover() throws IllegalStateException, IOException { synchronized (indexMutex) { - long start = System.currentTimeMillis(); - + + long start = System.currentTimeMillis(); Location recoveryPosition = getRecoveryPosition(); if( recoveryPosition!=null ) { int redoCounter = 0; + LOG.info("Recoverying from the journal ..."); while (recoveryPosition != null) { JournalCommand message = load(recoveryPosition); metadata.lastUpdate = recoveryPosition; @@ -398,7 +401,7 @@ public class MessageDatabase extends Ser recoveryPosition = journal.getNextLocation(recoveryPosition); } long end = System.currentTimeMillis(); - LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); + LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); } // We may have to undo some index updates. @@ -693,7 +696,7 @@ public class MessageDatabase extends Ser // from the recovery method too so they need to be idempotent // ///////////////////////////////////////////////////////////////// - private void process(JournalCommand data, final Location location) throws IOException { + void process(JournalCommand data, final Location location) throws IOException { data.visit(new Visitor() { @Override public void visit(KahaAddMessageCommand command) throws IOException { @@ -732,11 +735,12 @@ public class MessageDatabase extends Ser }); } - private void process(final KahaAddMessageCommand command, final Location location) throws IOException { + protected void process(final KahaAddMessageCommand command, final Location location) throws IOException { if (command.hasTransactionInfo()) { synchronized (indexMutex) { ArrayList inflightTx = getInflightTx(command.getTransactionInfo(), location); inflightTx.add(new AddOpperation(command, location)); + TransactionId key = key(command.getTransactionInfo()); } } else { synchronized (indexMutex) { @@ -836,7 +840,7 @@ public class MessageDatabase extends Ser protected final Object indexMutex = new Object(); private final HashSet journalFilesBeingReplicated = new HashSet(); - private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { + void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); // Skip adding the message to the index if this is a topic and there are @@ -870,7 +874,7 @@ public class MessageDatabase extends Ser } - private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { + void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); if (!command.hasSubscriptionKey()) { @@ -902,7 +906,7 @@ public class MessageDatabase extends Ser } } - private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { + void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); sd.orderIndex.clear(tx); sd.orderIndex.unload(tx); @@ -931,7 +935,7 @@ public class MessageDatabase extends Ser metadata.destinations.remove(tx, key); } - private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { + void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); // If set then we are creating it.. otherwise we are destroying the sub @@ -961,8 +965,7 @@ public class MessageDatabase extends Ser * @param tx * @throws IOException */ - private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { - + void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { LOG.debug("Checkpoint started."); metadata.state = OPEN_STATE; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Thu May 20 12:02:10 2010 @@ -19,32 +19,25 @@ package org.apache.activemq.thread; import java.util.HashMap; import java.util.Timer; import java.util.TimerTask; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; /** - * Singelton, references maintained by users * @version $Revision$ */ -public final class Scheduler { - - private final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true); - private final HashMap TIMER_TASKS = new HashMap(); - private static Scheduler instance; - - static { - instance = new Scheduler(); - } +public final class Scheduler extends ServiceSupport { + private final String name; + private Timer timer; + private final HashMap timerTasks = new HashMap(); - private Scheduler() { - } - - public static Scheduler getInstance() { - return instance; + public Scheduler (String name) { + this.name = name; } - - public synchronized void executePeriodically(final Runnable task, long period) { + + public void executePeriodically(final Runnable task, long period) { TimerTask timerTask = new SchedulerTimerTask(task); - CLOCK_DAEMON.scheduleAtFixedRate(timerTask, period, period); - TIMER_TASKS.put(task, timerTask); + timer.scheduleAtFixedRate(timerTask, period, period); + timerTasks.put(task, timerTask); } /* @@ -53,24 +46,38 @@ public final class Scheduler { */ public synchronized void schedualPeriodically(final Runnable task, long period) { TimerTask timerTask = new SchedulerTimerTask(task); - CLOCK_DAEMON.schedule(timerTask, period, period); - TIMER_TASKS.put(task, timerTask); + timer.schedule(timerTask, period, period); + timerTasks.put(task, timerTask); } public synchronized void cancel(Runnable task) { - TimerTask ticket = TIMER_TASKS.remove(task); + TimerTask ticket = timerTasks.remove(task); if (ticket != null) { ticket.cancel(); - CLOCK_DAEMON.purge();//remove cancelled TimerTasks + timer.purge();//remove cancelled TimerTasks } } - public void executeAfterDelay(final Runnable task, long redeliveryDelay) { + public synchronized void executeAfterDelay(final Runnable task, long redeliveryDelay) { TimerTask timerTask = new SchedulerTimerTask(task); - CLOCK_DAEMON.schedule(timerTask, redeliveryDelay); + timer.schedule(timerTask, redeliveryDelay); } public void shutdown() { - CLOCK_DAEMON.cancel(); + timer.cancel(); + } + + @Override + protected synchronized void doStart() throws Exception { + this.timer = new Timer(name, true); + + } + + @Override + protected synchronized void doStop(ServiceStopper stopper) throws Exception { + if (this.timer != null) { + this.timer.cancel(); + } + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java Thu May 20 12:02:10 2010 @@ -18,6 +18,7 @@ package org.apache.activemq.usage; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.Service; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.kahadb.plist.PListStore; @@ -36,6 +37,7 @@ public class SystemUsage implements Serv private MemoryUsage memoryUsage; private StoreUsage storeUsage; private TempUsage tempUsage; + private ThreadPoolExecutor executor; /** * True if someone called setSendFailIfNoSpace() on this particular usage @@ -45,7 +47,7 @@ public class SystemUsage implements Serv private boolean sendFailIfNoSpace; private boolean sendFailIfNoSpaceAfterTimeoutExplicitySet; private long sendFailIfNoSpaceAfterTimeout = 0; - + private final List children = new CopyOnWriteArrayList(); public SystemUsage() { @@ -58,14 +60,21 @@ public class SystemUsage implements Serv this.memoryUsage = new MemoryUsage(name + ":memory"); this.storeUsage = new StoreUsage(name + ":store", adapter); this.tempUsage = new TempUsage(name + ":temp", tempStore); + this.memoryUsage.setExecutor(getExecutor()); + this.storeUsage.setExecutor(getExecutor()); + this.tempUsage.setExecutor(getExecutor()); } public SystemUsage(SystemUsage parent, String name) { this.parent = parent; + this.executor = parent.getExecutor(); this.name = name; this.memoryUsage = new MemoryUsage(parent.memoryUsage, name + ":memory"); this.storeUsage = new StoreUsage(parent.storeUsage, name + ":store"); this.tempUsage = new TempUsage(parent.tempUsage, name + ":temp"); + this.memoryUsage.setExecutor(getExecutor()); + this.storeUsage.setExecutor(getExecutor()); + this.tempUsage.setExecutor(getExecutor()); } public String getName() { @@ -186,6 +195,7 @@ public class SystemUsage implements Serv memoryUsage.setParent(parent.memoryUsage); } this.memoryUsage = memoryUsage; + this.memoryUsage.setExecutor(getExecutor()); } public void setStoreUsage(StoreUsage storeUsage) { @@ -199,6 +209,7 @@ public class SystemUsage implements Serv storeUsage.setParent(parent.storeUsage); } this.storeUsage = storeUsage; + this.storeUsage.setExecutor(executor); } @@ -213,5 +224,30 @@ public class SystemUsage implements Serv tempDiskUsage.setParent(parent.tempUsage); } this.tempUsage = tempDiskUsage; + this.tempUsage.setExecutor(getExecutor()); + } + + /** + * @return the executor + */ + public ThreadPoolExecutor getExecutor() { + return this.executor; + } + + /** + * @param executor + * the executor to set + */ + public void setExecutor(ThreadPoolExecutor executor) { + this.executor = executor; + if (this.memoryUsage != null) { + this.memoryUsage.setExecutor(this.executor); + } + if (this.storeUsage != null) { + this.storeUsage.setExecutor(this.executor); + } + if (this.tempUsage != null) { + this.tempUsage.setExecutor(this.executor); + } } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Thu May 20 12:02:10 2010 @@ -21,13 +21,8 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activemq.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,7 +38,6 @@ import org.apache.commons.logging.LogFac public abstract class Usage implements Service { private static final Log LOG = LogFactory.getLog(Usage.class); - private static ThreadPoolExecutor executor; protected final Object usageMutex = new Object(); protected int percentUsage; protected T parent; @@ -53,12 +47,11 @@ public abstract class Usage children = new CopyOnWriteArrayList(); + private final List children = new CopyOnWriteArrayList(); private final List callbacks = new LinkedList(); private int pollingTime = 100; - - private AtomicBoolean started=new AtomicBoolean(); - + private final AtomicBoolean started=new AtomicBoolean(); + private ThreadPoolExecutor executor; public Usage(T parent, String name, float portion) { this.parent = parent; this.usagePortion = portion; @@ -289,6 +282,7 @@ public abstract class Usage(), new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Usage Async Task"); - thread.setDaemon(true); - return thread; - } - }); + public ThreadPoolExecutor getExecutor() { + return executor; } - } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java Thu May 20 12:02:10 2010 @@ -16,14 +16,23 @@ */ package org.apache.activemq.broker; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.IOHelper; public class BrokerRestartTestSupport extends BrokerTestSupport { private PersistenceAdapter persistenceAdapter; + @Override protected BrokerService createBroker() throws Exception { BrokerService broker = new BrokerService(); + File dir = broker.getBrokerDataDirectory(); + if (dir != null) { + IOHelper.deleteChildren(dir); + } //broker.setPersistent(false); broker.setDeleteAllMessagesOnStartup(true); persistenceAdapter = broker.getPersistenceAdapter(); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java Thu May 20 12:02:10 2010 @@ -17,13 +17,14 @@ package org.apache.activemq.bugs; import java.io.File; +import java.util.ArrayList; +import java.util.List; import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -41,9 +42,7 @@ import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; - import junit.framework.Test; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; @@ -121,7 +120,7 @@ public class DurableConsumerTest extends } private class MessagePublisher implements Runnable{ - private boolean shouldPublish = true; + private final boolean shouldPublish = true; public void run(){ TopicConnectionFactory topicConnectionFactory = null; @@ -170,13 +169,14 @@ public class DurableConsumerTest extends Thread publisherThread = new Thread(new MessagePublisher()); publisherThread.start(); - + final List list = new ArrayList(); for (int i = 0; i < 100; i++) { final int id = i; Thread thread = new Thread(new Runnable(){ public void run(){ - new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME); + SimpleTopicSubscriber s =new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME); + list.add(s); } }); thread.start(); @@ -189,6 +189,9 @@ public class DurableConsumerTest extends configurePersistence(broker); broker.start(); Thread.sleep(10000); + for (SimpleTopicSubscriber s:list) { + s.closeConnection(); + } assertEquals(0, exceptions.size()); } @@ -358,6 +361,7 @@ public class DurableConsumerTest extends } + @Override protected void setUp() throws Exception{ if (broker == null) { broker = createBroker(true); @@ -366,6 +370,7 @@ public class DurableConsumerTest extends super.setUp(); } + @Override protected void tearDown() throws Exception{ super.tearDown(); if (broker != null) { @@ -392,11 +397,13 @@ public class DurableConsumerTest extends protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception{ answer.setDeleteAllMessagesOnStartup(deleteStore); KahaDBStore kaha = new KahaDBStore(); + //kaha.setConcurrentStoreAndDispatchTopics(false); File directory = new File("target/activemq-data/kahadb"); if (deleteStore) { IOHelper.deleteChildren(directory); } kaha.setDirectory(directory); + //kaha.setMaxAsyncJobs(10); answer.setPersistenceAdapter(kaha); answer.addConnector(bindAddress); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java Thu May 20 12:02:10 2010 @@ -26,7 +26,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.thread.Scheduler; import org.apache.log4j.Logger; public class EmbeddedActiveMQ @@ -39,6 +38,7 @@ public class EmbeddedActiveMQ BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); BrokerService brokerService = null; + Connection connection = null; logger.info("Start..."); try @@ -49,7 +49,7 @@ public class EmbeddedActiveMQ logger.info("Broker '" + brokerService.getBrokerName() + "' is starting........"); brokerService.start(); ConnectionFactory fac = new ActiveMQConnectionFactory("vm://TestMQ"); - Connection connection = fac.createConnection(); + connection = fac.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("TEST.QUEUE"); MessageProducer producer = session.createProducer(queue); @@ -71,12 +71,9 @@ public class EmbeddedActiveMQ try { br.close(); - Scheduler scheduler = Scheduler.getInstance(); - scheduler.shutdown(); logger.info("Broker '" + brokerService.getBrokerName() + "' is stopping........"); - brokerService.stop(); - Scheduler.getInstance().shutdown(); - + connection.close(); + brokerService.stop(); sleep(8); logger.info(ThreadExplorer.show("Active threads after stop:")); @@ -90,7 +87,7 @@ public class EmbeddedActiveMQ logger.info("Waiting for list theads is greater then 1 ..."); int numTh = ThreadExplorer.active(); - while (numTh > 1) + while (numTh > 2) { sleep(3); numTh = ThreadExplorer.active(); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java Thu May 20 12:02:10 2010 @@ -18,15 +18,16 @@ package org.apache.activemq.network; import javax.jms.MessageProducer; import javax.jms.TemporaryQueue; - import org.apache.activemq.broker.BrokerService; public class DuplexNetworkTest extends SimpleNetworkTest { + @Override protected String getLocalBrokerURI() { return "org/apache/activemq/network/duplexLocalBroker.xml"; } + @Override protected BrokerService createRemoteBroker() throws Exception { BrokerService broker = new BrokerService(); broker.setBrokerName("remoteBroker"); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java Thu May 20 12:02:10 2010 @@ -17,7 +17,6 @@ package org.apache.activemq.network; import java.net.URI; - import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -30,9 +29,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicRequestor; import javax.jms.TopicSession; - import junit.framework.TestCase; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQTopic; @@ -174,11 +171,13 @@ public class SimpleNetworkTest extends T } } + @Override protected void setUp() throws Exception { super.setUp(); doSetUp(); } + @Override protected void tearDown() throws Exception { localBroker.deleteAllMessages(); remoteBroker.deleteAllMessages(); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java Thu May 20 12:02:10 2010 @@ -20,10 +20,11 @@ package org.apache.activemq.usage; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; - import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -31,6 +32,7 @@ import org.junit.Test; public class MemoryUsageTest { MemoryUsage underTest; + ThreadPoolExecutor executor; @Test public final void testPercentUsageNeedsNoThread() { @@ -46,6 +48,7 @@ public class MemoryUsageTest { public final void testAddUsageListenerStartsThread() throws Exception { int activeThreadCount = Thread.activeCount(); underTest = new MemoryUsage(); + underTest.setExecutor(executor); underTest.setLimit(10); underTest.start(); final CountDownLatch called = new CountDownLatch(1); @@ -66,12 +69,24 @@ public class MemoryUsageTest { @Before public void setUp() throws Exception { - underTest = new MemoryUsage(); + underTest = new MemoryUsage(); + this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "Usage Async Task"); + thread.setDaemon(true); + return thread; + } + }); + underTest.setExecutor(this.executor); + } @After public void tearDown() { assertNotNull(underTest); underTest.stop(); + if (this.executor != null) { + this.executor.shutdownNow(); + } } }