Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 72156 invoked from network); 20 May 2010 12:02:38 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 20 May 2010 12:02:38 -0000 Received: (qmail 8325 invoked by uid 500); 20 May 2010 12:02:38 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 8277 invoked by uid 500); 20 May 2010 12:02:37 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 8270 invoked by uid 99); 20 May 2010 12:02:37 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 May 2010 12:02:37 +0000 X-ASF-Spam-Status: No, hits=-1714.7 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 May 2010 12:02:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A9BA82388903; Thu, 20 May 2010 12:02:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r946600 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/... Date: Thu, 20 May 2010 12:02:11 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100520120212.A9BA82388903@eris.apache.org> Author: rajdavies Date: Thu May 20 12:02:10 2010 New Revision: 946600 URL: http://svn.apache.org/viewvc?rev=946600&view=rev Log: fixes for https://issues.apache.org/activemq/browse/AMQ-2620 and https://issues.apache.org/activemq/browse/AMQ-2568 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu May 20 12:02:10 2010 @@ -87,6 +87,7 @@ import org.apache.activemq.management.JM import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; import org.apache.activemq.state.CommandVisitorAdapter; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; @@ -114,7 +115,7 @@ public class ActiveMQConnection implemen protected boolean alwaysSessionAsync = true; private TaskRunnerFactory sessionTaskRunner; - private final ThreadPoolExecutor asyncConnectionThread; + private final ThreadPoolExecutor executor; // Connection state variables private final ConnectionInfo info; @@ -188,6 +189,7 @@ public class ActiveMQConnection implemen private boolean useDedicatedTaskRunner; protected volatile CountDownLatch transportInterruptionProcessingComplete; private long consumerFailoverRedeliveryWaitPeriod; + private final Scheduler scheduler; /** * Construct an ActiveMQConnection @@ -204,16 +206,16 @@ public class ActiveMQConnection implemen // Configure a single threaded executor who's core thread can timeout if // idle - asyncConnectionThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { public Thread newThread(Runnable r) { - Thread thread = new Thread(r, "ActiveMQ Connection Worker: " + transport); + Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); thread.setDaemon(true); return thread; } }); // asyncConnectionThread.allowCoreThreadTimeOut(true); - - this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId())); + String uniqueId = CONNECTION_ID_GENERATOR.generateId(); + this.info = new ConnectionInfo(new ConnectionId(uniqueId)); this.info.setManageable(true); this.info.setFaultTolerant(transport.isFaultTolerant()); this.connectionSessionId = new SessionId(info.getConnectionId(), -1); @@ -224,6 +226,8 @@ public class ActiveMQConnection implemen this.factoryStats.addConnection(this); this.timeCreated = System.currentTimeMillis(); this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); + this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler"); + this.scheduler.start(); } protected void setUserName(String userName) { @@ -609,6 +613,14 @@ public class ActiveMQConnection implemen advisoryConsumer.dispose(); advisoryConsumer = null; } + if (this.scheduler != null) { + try { + this.scheduler.stop(); + } catch (Exception e) { + JMSException ex = JMSExceptionSupport.create(e); + throw ex; + } + } long lastDeliveredSequenceId = 0; for (Iterator i = this.sessions.iterator(); i.hasNext();) { @@ -656,8 +668,8 @@ public class ActiveMQConnection implemen } } finally { try { - if (asyncConnectionThread != null){ - asyncConnectionThread.shutdown(); + if (executor != null){ + executor.shutdown(); } }catch(Throwable e) { LOG.error("Error shutting down thread pool " + e,e); @@ -1719,7 +1731,7 @@ public class ActiveMQConnection implemen @Override public Response processConnectionError(final ConnectionError error) throws Exception { - asyncConnectionThread.execute(new Runnable() { + executor.execute(new Runnable() { public void run() { onAsyncException(error.getException()); } @@ -1779,7 +1791,7 @@ public class ActiveMQConnection implemen public void onClientInternalException(final Throwable error) { if ( !closed.get() && !closing.get() ) { if ( this.clientInternalExceptionListener != null ) { - asyncConnectionThread.execute(new Runnable() { + executor.execute(new Runnable() { public void run() { ActiveMQConnection.this.clientInternalExceptionListener.onException(error); } @@ -1804,7 +1816,7 @@ public class ActiveMQConnection implemen } final JMSException e = (JMSException)error; - asyncConnectionThread.execute(new Runnable() { + executor.execute(new Runnable() { public void run() { ActiveMQConnection.this.exceptionListener.onException(e); } @@ -1819,7 +1831,7 @@ public class ActiveMQConnection implemen public void onException(final IOException error) { onAsyncException(error); if (!closing.get() && !closed.get()) { - asyncConnectionThread.execute(new Runnable() { + executor.execute(new Runnable() { public void run() { transportFailed(error); ServiceSupport.dispose(ActiveMQConnection.this.transport); @@ -2297,4 +2309,12 @@ public class ActiveMQConnection implemen public long getConsumerFailoverRedeliveryWaitPeriod() { return consumerFailoverRedeliveryWaitPeriod; } + + protected Scheduler getScheduler() { + return this.scheduler; + } + + protected ThreadPoolExecutor getExecutor() { + return this.executor; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu May 20 12:02:10 2010 @@ -29,7 +29,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; - import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; @@ -37,7 +36,6 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.TransactionRolledBackException; - import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.activemq.command.ActiveMQDestination; @@ -111,7 +109,7 @@ public class ActiveMQMessageConsumer imp } private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class); - protected static final Scheduler scheduler = Scheduler.getInstance(); + protected final Scheduler scheduler; protected final ActiveMQSession session; protected final ConsumerInfo info; @@ -130,17 +128,17 @@ public class ActiveMQMessageConsumer imp private int ackCounter; private int dispatchedCount; private final AtomicReference messageListener = new AtomicReference(); - private JMSConsumerStatsImpl stats; + private final JMSConsumerStatsImpl stats; private final String selector; private boolean synchronizationRegistered; - private AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean started = new AtomicBoolean(false); private MessageAvailableListener availableListener; private RedeliveryPolicy redeliveryPolicy; private boolean optimizeAcknowledge; - private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); + private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); private ExecutorService executorService; private MessageTransformer transformer; private boolean clearDispatchList; @@ -152,7 +150,7 @@ public class ActiveMQMessageConsumer imp private IOException failureError; private long optimizeAckTimestamp = System.currentTimeMillis(); - private long optimizeAckTimeout = 300; + private final long optimizeAckTimeout = 300; private long failoverRedeliveryWaitPeriod = 0; /** @@ -202,6 +200,7 @@ public class ActiveMQMessageConsumer imp } this.session = session; + this.scheduler = session.getScheduler(); this.redeliveryPolicy = session.connection.getRedeliveryPolicy(); setTransformer(session.getTransformer()); @@ -634,10 +633,12 @@ public class ActiveMQMessageConsumer imp if (!unconsumedMessages.isClosed()) { if (session.getTransactionContext().isInTransaction()) { session.getTransactionContext().addSynchronization(new Synchronization() { + @Override public void afterCommit() throws Exception { doClose(); } + @Override public void afterRollback() throws Exception { doClose(); } @@ -912,16 +913,19 @@ public class ActiveMQMessageConsumer imp if (!synchronizationRegistered) { synchronizationRegistered = true; session.getTransactionContext().addSynchronization(new Synchronization() { + @Override public void beforeEnd() throws Exception { acknowledge(); synchronizationRegistered = false; } + @Override public void afterCommit() throws Exception { commit(); synchronizationRegistered = false; } + @Override public void afterRollback() throws Exception { rollback(); synchronizationRegistered = false; @@ -1325,6 +1329,7 @@ public class ActiveMQMessageConsumer imp unconsumedMessages.stop(); } + @Override public String toString() { return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get() + " }"; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Thu May 20 12:02:10 2010 @@ -19,13 +19,11 @@ package org.apache.activemq; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; - import javax.jms.Destination; import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Message; - import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; @@ -73,9 +71,9 @@ public class ActiveMQMessageProducer ext protected ProducerInfo info; protected boolean closed; - private JMSProducerStatsImpl stats; + private final JMSProducerStatsImpl stats; private AtomicLong messageSequence; - private long startTime; + private final long startTime; private MessageTransformer transformer; private MemoryUsage producerWindow; @@ -93,6 +91,7 @@ public class ActiveMQMessageProducer ext // size > 0 if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) { producerWindow = new MemoryUsage("Producer Window: " + producerId); + producerWindow.setExecutor(session.getConnectionExecutor()); producerWindow.setLimit(this.info.getWindowSize()); producerWindow.start(); } @@ -164,6 +163,7 @@ public class ActiveMQMessageProducer ext * * @throws IllegalStateException */ + @Override protected void checkClosed() throws IllegalStateException { if (closed) { throw new IllegalStateException("The producer is closed"); @@ -280,6 +280,7 @@ public class ActiveMQMessageProducer ext this.info = info; } + @Override public String toString() { return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }"; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu May 20 12:02:10 2010 @@ -24,8 +24,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; - import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.IllegalStateException; @@ -53,7 +53,6 @@ import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.jms.TransactionRolledBackException; - import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobUploader; @@ -198,7 +197,8 @@ public class ActiveMQSession implements } private static final Log LOG = LogFactory.getLog(ActiveMQSession.class); - protected static final Scheduler scheduler = Scheduler.getInstance(); + private final Scheduler scheduler; + private final ThreadPoolExecutor connectionExecutor; protected int acknowledgementMode; protected final ActiveMQConnection connection; @@ -220,7 +220,7 @@ public class ActiveMQSession implements protected Object sendMutex = new Object(); private MessageListener messageListener; - private JMSSessionStatsImpl stats; + private final JMSSessionStatsImpl stats; private TransactionContext transactionContext; private DeliveryListener deliveryListener; private MessageTransformer transformer; @@ -251,7 +251,8 @@ public class ActiveMQSession implements this.connection.asyncSendPacket(info); setTransformer(connection.getTransformer()); setBlobTransferPolicy(connection.getBlobTransferPolicy()); - + this.scheduler=connection.getScheduler(); + this.connectionExecutor=connection.getExecutor(); if (connection.isStarted()) { start(); } @@ -613,11 +614,13 @@ public class ActiveMQSession implements synchronizationRegistered = true; getTransactionContext().addSynchronization(new Synchronization() { + @Override public void afterCommit() throws Exception { doClose(); synchronizationRegistered = false; } + @Override public void afterRollback() throws Exception { doClose(); synchronizationRegistered = false; @@ -846,6 +849,7 @@ public class ActiveMQSession implements if (ack.getTransactionId() != null) { getTransactionContext().addSynchronization(new Synchronization() { + @Override public void afterRollback() throws Exception { md.getMessage().onMessageRolledBack(); // ensure we don't filter this as a duplicate @@ -1947,6 +1951,7 @@ public class ActiveMQSession implements return executor.getUnconsumedMessages(); } + @Override public String toString() { return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}"; } @@ -2025,4 +2030,12 @@ public class ActiveMQSession implements syncSendPacket(ack); } } + + protected Scheduler getScheduler() { + return this.scheduler; + } + + protected ThreadPoolExecutor getConnectionExecutor() { + return this.connectionExecutor; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Thu May 20 12:02:10 2010 @@ -18,6 +18,7 @@ package org.apache.activemq.broker; import java.net.URI; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.Service; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; @@ -33,6 +34,7 @@ import org.apache.activemq.command.Produ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.kahadb.plist.PListStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; /** @@ -372,6 +374,10 @@ public interface Broker extends Region, * configuration */ void nowMasterBroker(); + + Scheduler getScheduler(); + + ThreadPoolExecutor getExecutor(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Thu May 20 12:02:10 2010 @@ -19,6 +19,7 @@ package org.apache.activemq.broker; import java.net.URI; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -40,6 +41,7 @@ import org.apache.activemq.command.Respo import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.kahadb.plist.PListStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; /** @@ -300,4 +302,12 @@ public class BrokerFilter implements Bro ConsumerControl control) { next.processConsumerControl(consumerExchange, control); } + + public Scheduler getScheduler() { + return next.getScheduler(); + } + + public ThreadPoolExecutor getExecutor() { + return next.getExecutor(); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Thu May 20 12:02:10 2010 @@ -29,6 +29,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.PostConstruct; @@ -78,9 +81,10 @@ import org.apache.activemq.security.Secu import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapterFactory; -import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportServer; @@ -188,9 +192,10 @@ public class BrokerService implements Se private IOExceptionHandler ioExceptionHandler; private boolean schedulerSupport = true; private File schedulerDirectoryFile; - + private Scheduler scheduler; + private ThreadPoolExecutor executor; private boolean slave = true; - + static { String localHostName = "localhost"; try { @@ -589,6 +594,15 @@ public class BrokerService implements Se } } } + if (this.taskRunnerFactory != null) { + this.taskRunnerFactory.shutdown(); + } + if (this.scheduler != null) { + this.scheduler.stop(); + } + if (this.executor != null) { + this.executor.shutdownNow(); + } LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped"); synchronized (shutdownHooks) { for (Runnable hook : shutdownHooks) { @@ -756,9 +770,6 @@ public class BrokerService implements Se } public PersistenceAdapterFactory getPersistenceFactory() { - if (persistenceFactory == null) { - persistenceFactory = createPersistenceFactory(); - } return persistenceFactory; } @@ -848,6 +859,7 @@ public class BrokerService implements Se try { if (systemUsage == null) { systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore()); + systemUsage.setExecutor(getExecutor()); systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default // 64 // Meg @@ -869,6 +881,9 @@ public class BrokerService implements Se removeService(this.systemUsage); } this.systemUsage = memoryManager; + if (this.systemUsage.getExecutor()==null) { + this.systemUsage.setExecutor(getExecutor()); + } addService(this.systemUsage); } @@ -953,11 +968,11 @@ public class BrokerService implements Se } public TaskRunnerFactory getTaskRunnerFactory() { - if (taskRunnerFactory == null) { - taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000, + if (this.taskRunnerFactory == null) { + this.taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000, isDedicatedTaskRunner()); } - return taskRunnerFactory; + return this.taskRunnerFactory; } public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { @@ -1769,10 +1784,10 @@ public class BrokerService implements Se RegionBroker regionBroker; if (isUseJmx()) { regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), - getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor); + getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); } else { regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, - destinationInterceptor); + destinationInterceptor,getScheduler(),getExecutor()); } destinationFactory.setRegionBroker(regionBroker); regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); @@ -1850,20 +1865,20 @@ public class BrokerService implements Se protected PersistenceAdapter createPersistenceAdapter() throws IOException { if (isPersistent()) { - return getPersistenceFactory().createPersistenceAdapter(); + PersistenceAdapterFactory fac = getPersistenceFactory(); + if (fac != null) { + return fac.createPersistenceAdapter(); + }else { + KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter(); + File dir = new File(getBrokerDataDirectory(),"KahaDB"); + adaptor.setDirectory(dir); + return adaptor; + } } else { return new MemoryPersistenceAdapter(); } } - protected AMQPersistenceAdapterFactory createPersistenceFactory() { - AMQPersistenceAdapterFactory factory = new AMQPersistenceAdapterFactory(); - factory.setDataDirectory(getBrokerDataDirectory()); - factory.setTaskRunnerFactory(getPersistenceTaskRunnerFactory()); - factory.setBrokerName(getBrokerName()); - return factory; - } - protected ObjectName createBrokerObjectName() throws IOException { try { return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" @@ -2124,6 +2139,31 @@ public class BrokerService implements Se } } } + + protected synchronized ThreadPoolExecutor getExecutor() { + if (this.executor == null) { + this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "Usage Async Task"); + thread.setDaemon(true); + return thread; + } + }); + } + return this.executor; + } + + protected synchronized Scheduler getScheduler() { + if (this.scheduler==null) { + this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler"); + try { + this.scheduler.start(); + } catch (Exception e) { + LOG.error("Failed to start Scheduler ",e); + } + } + return this.scheduler; + } public Broker getRegionBroker() { return regionBroker; @@ -2251,7 +2291,5 @@ public class BrokerService implements Se public void setSchedulerDirectory(String schedulerDirectory) { setSchedulerDirectoryFile(new File(schedulerDirectory)); - } - - + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Thu May 20 12:02:10 2010 @@ -20,6 +20,7 @@ import java.net.URI; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -41,6 +42,7 @@ import org.apache.activemq.command.Respo import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.kahadb.plist.PListStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; /** @@ -283,4 +285,12 @@ public class EmptyBroker implements Brok public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { } + + public Scheduler getScheduler() { + return null; + } + + public ThreadPoolExecutor getExecutor() { + return null; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Thu May 20 12:02:10 2010 @@ -20,6 +20,7 @@ import java.net.URI; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -41,6 +42,7 @@ import org.apache.activemq.command.Respo import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.kahadb.plist.PListStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; /** @@ -302,4 +304,12 @@ public class ErrorBroker implements Brok ConsumerControl control) { throw new BrokerStoppedException(this.message); } + + public Scheduler getScheduler() { + throw new BrokerStoppedException(this.message); + } + + public ThreadPoolExecutor getExecutor() { + throw new BrokerStoppedException(this.message); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Thu May 20 12:02:10 2010 @@ -19,6 +19,7 @@ package org.apache.activemq.broker; import java.net.URI; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; @@ -41,6 +42,7 @@ import org.apache.activemq.command.Respo import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.kahadb.plist.PListStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; /** @@ -312,4 +314,12 @@ public class MutableBrokerFilter impleme getNext().processConsumerControl(consumerExchange, control); } + public Scheduler getScheduler() { + return getNext().getScheduler(); + } + + public ThreadPoolExecutor getExecutor() { + return getNext().getExecutor(); + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Thu May 20 12:02:10 2010 @@ -16,6 +16,28 @@ */ package org.apache.activemq.broker.jmx; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ThreadPoolExecutor; +import javax.management.InstanceNotFoundException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -41,6 +63,7 @@ import org.apache.activemq.command.Subsc import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.JMXSupport; @@ -48,27 +71,6 @@ import org.apache.activemq.util.ServiceS import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import javax.management.InstanceNotFoundException; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; public class ManagedRegionBroker extends RegionBroker { private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class); @@ -91,18 +93,20 @@ public class ManagedRegionBroker extends private Broker contextBroker; public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, - DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException { - super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor); + DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException { + super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor); this.managementContext = context; this.brokerObjectName = brokerObjectName; } + @Override public void start() throws Exception { super.start(); // build all existing durable subscriptions buildExistingSubscriptions(); } + @Override protected void doStop(ServiceStopper stopper) { super.doStop(stopper); // lets remove any mbeans not yet removed @@ -119,18 +123,22 @@ public class ManagedRegionBroker extends registeredMBeans.clear(); } + @Override protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } + @Override protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } + @Override protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } + @Override protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu May 20 12:02:10 2010 @@ -17,9 +17,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; - import javax.jms.ResourceAllocationException; - import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Thu May 20 12:02:10 2010 @@ -55,6 +55,7 @@ public class DestinationFactoryImpl exte this.persistenceAdapter = persistenceAdapter; } + @Override public void setRegionBroker(RegionBroker broker) { if (broker == null) { throw new IllegalArgumentException("null broker"); @@ -62,6 +63,7 @@ public class DestinationFactoryImpl exte this.broker = broker; } + @Override public Set getDestinations() { return persistenceAdapter.getDestinations(); } @@ -69,6 +71,7 @@ public class DestinationFactoryImpl exte /** * @return instance of {@link Queue} or {@link Topic} */ + @Override public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception { if (destination.isQueue()) { if (destination.isTemporary()) { @@ -100,6 +103,7 @@ public class DestinationFactoryImpl exte } } + @Override public void removeDestination(Destination dest) { ActiveMQDestination destination = dest.getActiveMQDestination(); if (!destination.isTemporary()) { @@ -131,11 +135,12 @@ public class DestinationFactoryImpl exte if (broker.getDestinationPolicy() != null) { PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); if (entry != null) { - entry.configure(topic); + entry.configure(broker,topic); } } } + @Override public long getLastMessageBrokerSequenceId() throws IOException { return persistenceAdapter.getLastMessageBrokerSequenceId(); } @@ -144,6 +149,7 @@ public class DestinationFactoryImpl exte return persistenceAdapter; } + @Override public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException { return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu May 20 12:02:10 2010 @@ -23,10 +23,8 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - import javax.jms.InvalidSelectorException; import javax.jms.JMSException; - import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; @@ -55,7 +53,7 @@ import org.apache.commons.logging.LogFac public abstract class PrefetchSubscription extends AbstractSubscription { private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class); - protected static final Scheduler scheduler = Scheduler.getInstance(); + protected final Scheduler scheduler; protected PendingMessageCursor pending; protected final List dispatched = new CopyOnWriteArrayList(); @@ -70,12 +68,13 @@ public abstract class PrefetchSubscripti private final Object pendingLock = new Object(); private final Object dispatchLock = new Object(); protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(); - private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); + private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException { super(broker,context, info); this.usageManager=usageManager; pending = cursor; + this.scheduler = broker.getScheduler(); } public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { @@ -230,6 +229,7 @@ public abstract class PrefetchSubscripti context.getTransaction().addSynchronization( new Synchronization() { + @Override public void afterCommit() throws Exception { synchronized(dispatchLock) { @@ -239,6 +239,7 @@ public abstract class PrefetchSubscripti } } + @Override public void afterRollback() throws Exception { synchronized(dispatchLock) { if (isSlave()) { @@ -486,6 +487,7 @@ public abstract class PrefetchSubscripti return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9); } + @Override public int countBeforeFull() { return info.getPrefetchSize() + prefetchExtension - dispatched.size(); } @@ -510,6 +512,7 @@ public abstract class PrefetchSubscripti return enqueueCounter; } + @Override public boolean isRecoveryRequired() { return pending.isRecoveryRequired(); } @@ -526,13 +529,15 @@ public abstract class PrefetchSubscripti } } - public void add(ConnectionContext context, Destination destination) throws Exception { + @Override +public void add(ConnectionContext context, Destination destination) throws Exception { synchronized(pendingLock) { super.add(context, destination); pending.add(context, destination); } } + @Override public List remove(ConnectionContext context, Destination destination) throws Exception { List rc = new ArrayList(); synchronized(pendingLock) { @@ -546,7 +551,7 @@ public abstract class PrefetchSubscripti synchronized(dispatchLock) { for (MessageReference r : dispatched) { if( r.getRegionDestination() == destination) { - rc.add((QueueMessageReference)r); + rc.add(r); } } destination.getDestinationStatistics().getDispatched().subtract(dispatched.size()); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu May 20 12:02:10 2010 @@ -125,7 +125,7 @@ public class Queue extends BaseDestinati }; private final Object iteratingMutex = new Object() {}; - private static final Scheduler scheduler = Scheduler.getInstance(); + private final Scheduler scheduler; class TimeoutMessage implements Delayed { @@ -203,6 +203,7 @@ public class Queue extends BaseDestinati super(brokerService, store, destination, parentStats); this.taskFactory = taskFactory; this.dispatchSelector = new QueueDispatchSelector(destination); + this.scheduler = brokerService.getBroker().getScheduler(); } public List getConsumers() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu May 20 12:02:10 2010 @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadPoolExecutor; import javax.jms.InvalidClientIDException; import javax.jms.JMSException; import org.apache.activemq.broker.Broker; @@ -57,6 +58,7 @@ import org.apache.activemq.command.Respo import org.apache.activemq.command.TransactionId; import org.apache.activemq.state.ConnectionState; import org.apache.activemq.store.kahadb.plist.PListStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.BrokerSupport; @@ -98,10 +100,14 @@ public class RegionBroker extends EmptyB private final Map clientIdSet = new HashMap(); private final DestinationInterceptor destinationInterceptor; private ConnectionContext adminConnectionContext; + private final Scheduler scheduler; + private final ThreadPoolExecutor executor; public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory, - DestinationInterceptor destinationInterceptor) throws IOException { + DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException { this.brokerService = brokerService; + this.executor=executor; + this.scheduler = scheduler; if (destinationFactory == null) { throw new IllegalArgumentException("null destinationFactory"); } @@ -810,6 +816,16 @@ public class RegionBroker extends EmptyB } } + + @Override + public Scheduler getScheduler() { + return this.scheduler; + } + + public ThreadPoolExecutor getExecutor() { + return this.executor; + } + @Override public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { ActiveMQDestination destination = control.getDestination(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Thu May 20 12:02:10 2010 @@ -231,7 +231,7 @@ public class TopicRegion extends Abstrac if (broker.getDestinationPolicy() != null) { PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); if (entry != null) { - entry.configure(topic); + entry.configure(broker,topic); } } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java Thu May 20 12:02:10 2010 @@ -5,7 +5,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Subscription; @@ -24,15 +23,19 @@ public class AbortSlowConsumerStrategy i private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class); - private static final Scheduler scheduler = Scheduler.getInstance(); - private AtomicBoolean taskStarted = new AtomicBoolean(false); - private Map slowConsumers = new ConcurrentHashMap(); + private Scheduler scheduler; + private final AtomicBoolean taskStarted = new AtomicBoolean(false); + private final Map slowConsumers = new ConcurrentHashMap(); private long maxSlowCount = -1; private long maxSlowDuration = 30*1000; private long checkPeriod = 30*1000; private boolean abortConnection = false; + public void setScheduler(Scheduler s) { + this.scheduler=s; + } + public void slowConsumer(ConnectionContext context, Subscription subs) { if (maxSlowCount < 0 && maxSlowDuration < 0) { // nothing to do Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010 @@ -18,7 +18,7 @@ package org.apache.activemq.broker.regio import java.util.ArrayList; import java.util.List; - +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; @@ -118,4 +118,7 @@ public class FixedCountSubscriptionRecov return result.toArray(new Message[result.size()]); } + public void setBroker(Broker broker) { + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010 @@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio import java.util.Iterator; import java.util.List; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; @@ -109,6 +110,9 @@ public class FixedSizedSubscriptionRecov public Message[] browse(ActiveMQDestination destination) throws Exception { return buffer.browse(destination); } + + public void setBroker(Broker broker) { + } // Implementation methods Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010 @@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio import java.util.ArrayList; import java.util.List; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; @@ -68,5 +69,8 @@ public class LastImageSubscriptionRecove public SubscriptionRecoveryPolicy copy() { return new LastImageSubscriptionRecoveryPolicy(); } + + public void setBroker(Broker broker) { + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010 @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; @@ -52,5 +53,8 @@ public class NoSubscriptionRecoveryPolic public Message[] browse(ActiveMQDestination dest) throws Exception { return new Message[0]; } + + public void setBroker(Broker broker) { + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Thu May 20 12:02:10 2010 @@ -90,7 +90,7 @@ public class PolicyEntry extends Destina public void configure(Broker broker,Queue queue) { - baseConfiguration(queue); + baseConfiguration(broker,queue); if (dispatchPolicy != null) { queue.setDispatchPolicy(dispatchPolicy); } @@ -112,14 +112,16 @@ public class PolicyEntry extends Destina queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); } - public void configure(Topic topic) { - baseConfiguration(topic); + public void configure(Broker broker,Topic topic) { + baseConfiguration(broker,topic); if (dispatchPolicy != null) { topic.setDispatchPolicy(dispatchPolicy); } topic.setDeadLetterStrategy(getDeadLetterStrategy()); if (subscriptionRecoveryPolicy != null) { - topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy()); + SubscriptionRecoveryPolicy srp = subscriptionRecoveryPolicy.copy(); + srp.setBroker(broker); + topic.setSubscriptionRecoveryPolicy(srp); } if (memoryLimit > 0) { topic.getMemoryUsage().setLimit(memoryLimit); @@ -127,7 +129,7 @@ public class PolicyEntry extends Destina topic.setLazyDispatch(isLazyDispatch()); } - public void baseConfiguration(BaseDestination destination) { + public void baseConfiguration(Broker broker,BaseDestination destination) { destination.setProducerFlowControl(isProducerFlowControl()); destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); destination.setEnableAudit(isEnableAudit()); @@ -148,7 +150,11 @@ public class PolicyEntry extends Destina destination.setMaxExpirePageSize(getMaxExpirePageSize()); destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); - destination.setSlowConsumerStrategy(getSlowConsumerStrategy()); + SlowConsumerStrategy scs = getSlowConsumerStrategy(); + if (scs != null) { + scs.setScheduler(broker.getScheduler()); + } + destination.setSlowConsumerStrategy(scs); } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010 @@ -17,12 +17,11 @@ package org.apache.activemq.broker.region.policy; import java.util.concurrent.atomic.AtomicLong; - import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; - import org.apache.activemq.ActiveMQMessageTransformation; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; @@ -50,9 +49,9 @@ public class QueryBasedSubscriptionRecov private static final Log LOG = LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class); private MessageQuery query; - private AtomicLong messageSequence = new AtomicLong(0); - private IdGenerator idGenerator = new IdGenerator(); - private ProducerId producerId = createProducerId(); + private final AtomicLong messageSequence = new AtomicLong(0); + private final IdGenerator idGenerator = new IdGenerator(); + private final ProducerId producerId = createProducerId(); public SubscriptionRecoveryPolicy copy() { QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy(); @@ -99,6 +98,9 @@ public class QueryBasedSubscriptionRecov public org.apache.activemq.command.Message[] browse(ActiveMQDestination dest) throws Exception { return new org.apache.activemq.command.Message[0]; } + + public void setBroker(Broker broker) { + } protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) { try { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java Thu May 20 12:02:10 2010 @@ -2,6 +2,7 @@ package org.apache.activemq.broker.regio import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.thread.Scheduler; /* * a strategy for dealing with slow consumers @@ -9,5 +10,6 @@ import org.apache.activemq.broker.region public interface SlowConsumerStrategy { void slowConsumer(ConnectionContext context, Subscription subs); + void setScheduler(Scheduler scheduler); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010 @@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio import org.apache.activemq.Service; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; @@ -69,4 +70,6 @@ public interface SubscriptionRecoveryPol * @return the copy */ SubscriptionRecoveryPolicy copy(); + + void setBroker(Broker broker); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010 @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; @@ -28,7 +29,6 @@ import org.apache.activemq.broker.region import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.filter.DestinationFilter; -import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.thread.Scheduler; /** @@ -42,7 +42,7 @@ import org.apache.activemq.thread.Schedu public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { private static final int GC_INTERVAL = 1000; - protected static final Scheduler scheduler = Scheduler.getInstance(); + private Scheduler scheduler; // TODO: need to get a better synchronized linked list that has little // contention between enqueuing and dequeuing @@ -89,6 +89,10 @@ public class TimedSubscriptionRecoveryPo } } } + + public void setBroker(Broker broker) { + this.scheduler = broker.getScheduler(); + } public void start() throws Exception { scheduler.executePeriodically(gcTask, GC_INTERVAL); @@ -97,6 +101,7 @@ public class TimedSubscriptionRecoveryPo public void stop() throws Exception { scheduler.cancel(gcTask); } + public void gc() { lastGCRun = System.currentTimeMillis(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Thu May 20 12:02:10 2010 @@ -35,7 +35,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; - import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand; import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey; import org.apache.activemq.thread.Scheduler; @@ -75,7 +74,7 @@ public class AsyncDataManager { public static final int PREFERED_DIFF = 1024 * 512; private static final Log LOG = LogFactory.getLog(AsyncDataManager.class); - protected static Scheduler scheduler = Scheduler.getInstance(); + protected Scheduler scheduler; protected final Map inflightWrites = new ConcurrentHashMap(); @@ -193,7 +192,13 @@ public class AsyncDataManager { cleanup(); } }; - scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL); + this.scheduler = new Scheduler("AsyncDataManager Scheduler"); + try { + this.scheduler.start(); + } catch (Exception e) { + throw new IOException(e); + } + this.scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL); } public void lock() throws IOException { @@ -328,7 +333,12 @@ public class AsyncDataManager { if (!started) { return; } - scheduler.cancel(cleanupTask); + this.scheduler.cancel(cleanupTask); + try { + this.scheduler.stop(); + } catch (Exception e) { + throw new IOException(e); + } accessorPool.close(); storeState(false); appender.close(); @@ -376,7 +386,7 @@ public class AsyncDataManager { public synchronized void addInterestInFile(int file) throws IOException { if (file >= 0) { Integer key = Integer.valueOf(file); - DataFile dataFile = (DataFile)fileMap.get(key); + DataFile dataFile = fileMap.get(key); if (dataFile == null) { throw new IOException("That data file does not exist"); } @@ -393,7 +403,7 @@ public class AsyncDataManager { public synchronized void removeInterestInFile(int file) throws IOException { if (file >= 0) { Integer key = Integer.valueOf(file); - DataFile dataFile = (DataFile)fileMap.get(key); + DataFile dataFile = fileMap.get(key); removeInterestInFile(dataFile); } @@ -414,7 +424,7 @@ public class AsyncDataManager { List purgeList = new ArrayList(); for (Integer key : unUsed) { - DataFile dataFile = (DataFile)fileMap.get(key); + DataFile dataFile = fileMap.get(key); purgeList.add(dataFile); } for (DataFile dataFile : purgeList) { @@ -432,7 +442,7 @@ public class AsyncDataManager { for (Integer key : unUsed) { // Only add files less than the lastFile.. if( key.intValue() < lastFile.intValue() ) { - DataFile dataFile = (DataFile)fileMap.get(key); + DataFile dataFile = fileMap.get(key); purgeList.add(dataFile); } } @@ -499,6 +509,7 @@ public class AsyncDataManager { this.maxFileLength = maxFileLength; } + @Override public String toString() { return "DataManager:(" + filePrefix + ")"; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=946600&r1=946599&r2=946600&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Thu May 20 12:02:10 2010 @@ -30,7 +30,6 @@ import java.util.concurrent.CountDownLat import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import org.apache.activeio.journal.Journal; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; @@ -58,7 +57,6 @@ import org.apache.activemq.store.TopicMe import org.apache.activemq.store.TopicReferenceStore; import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter; -import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; @@ -85,7 +83,7 @@ import org.apache.commons.logging.LogFac public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware { private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class); - private static final Scheduler scheduler = Scheduler.getInstance(); + private Scheduler scheduler; private final ConcurrentHashMap queues = new ConcurrentHashMap(); private final ConcurrentHashMap topics = new ConcurrentHashMap(); private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq"; @@ -99,7 +97,7 @@ public class AMQPersistenceAdapter imple private SystemUsage usageManager; private long checkpointInterval = 1000 * 20; private int maxCheckpointMessageAddSize = 1024 * 4; - private AMQTransactionStore transactionStore = new AMQTransactionStore(this); + private final AMQTransactionStore transactionStore = new AMQTransactionStore(this); private TaskRunner checkpointTask; private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); private final AtomicBoolean started = new AtomicBoolean(false); @@ -112,7 +110,7 @@ public class AMQPersistenceAdapter imple private File directory; private File directoryArchive; private BrokerService brokerService; - private AtomicLong storeSize = new AtomicLong(); + private final AtomicLong storeSize = new AtomicLong(); private boolean persistentIndex=true; private boolean useNio = true; private boolean archiveDataLogs=false; @@ -124,8 +122,7 @@ public class AMQPersistenceAdapter imple private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY; private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR; private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH; - private Map> dataFilesInProgress = new ConcurrentHashMap> (); - private String directoryPath = ""; + private final Map> dataFilesInProgress = new ConcurrentHashMap> (); private RandomAccessFile lockFile; private FileLock lock; private boolean disableLocking = DISABLE_LOCKING; @@ -134,6 +131,8 @@ public class AMQPersistenceAdapter imple private boolean lockAquired; private boolean recoverReferenceStore=true; private boolean forceRecoverReferenceStore=false; + private boolean useDedicatedTaskRunner=false; + private int journalThreadPriority = Thread.MAX_PRIORITY; public String getBrokerName() { return this.brokerName; @@ -165,12 +164,19 @@ public class AMQPersistenceAdapter imple } else { this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName)); this.directory = new File(directory, "amqstore"); - this.directoryPath=directory.getAbsolutePath(); + directory.getAbsolutePath(); } } if (this.directoryArchive == null) { this.directoryArchive = new File(this.directory,"archive"); } + if (this.brokerService != null) { + this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory(); + }else { + this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler"); + } + this.taskRunnerFactory= new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(), + true, 1000, isUseDedicatedTaskRunner()); IOHelper.mkdirs(this.directory); lockFile = new RandomAccessFile(new File(directory, "lock"), "rw"); lock(); @@ -192,10 +198,11 @@ public class AMQPersistenceAdapter imple referenceStoreAdapter.setBrokerName(getBrokerName()); referenceStoreAdapter.setUsageManager(usageManager); referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength()); - if (taskRunnerFactory == null) { - taskRunnerFactory = createTaskRunnerFactory(); - } + if (brokerService != null) { + this.scheduler = this.brokerService.getBroker().getScheduler(); + } + if (failIfJournalIsLocked) { asyncDataManager.lock(); } else { @@ -389,7 +396,7 @@ public class AMQPersistenceAdapter imple Iterator queueIterator = queues.values().iterator(); while (queueIterator.hasNext()) { final AMQMessageStore ms = queueIterator.next(); - Location mark = (Location)ms.getMark(); + Location mark = ms.getMark(); if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { newMark = mark; } @@ -397,7 +404,7 @@ public class AMQPersistenceAdapter imple Iterator topicIterator = topics.values().iterator(); while (topicIterator.hasNext()) { final AMQTopicMessageStore ms = topicIterator.next(); - Location mark = (Location)ms.getMark(); + Location mark = ms.getMark(); if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { newMark = mark; } @@ -726,6 +733,7 @@ public class AMQPersistenceAdapter imple deleteAllMessages = true; } + @Override public String toString() { return "AMQPersistenceAdapter(" + directory + ")"; } @@ -754,10 +762,6 @@ public class AMQPersistenceAdapter imple return adaptor; } - protected TaskRunnerFactory createTaskRunnerFactory() { - return DefaultThreadPools.getDefaultTaskRunnerFactory(); - } - // ///////////////////////////////////////////////////////////////// // Property Accessors // ///////////////////////////////////////////////////////////////// @@ -991,6 +995,28 @@ public class AMQPersistenceAdapter imple public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) { this.forceRecoverReferenceStore = forceRecoverReferenceStore; } + + public boolean isUseDedicatedTaskRunner() { + return useDedicatedTaskRunner; + } + + public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { + this.useDedicatedTaskRunner = useDedicatedTaskRunner; + } + + /** + * @return the journalThreadPriority + */ + public int getJournalThreadPriority() { + return this.journalThreadPriority; + } + + /** + * @param journalThreadPriority the journalThreadPriority to set + */ + public void setJournalThreadPriority(int journalThreadPriority) { + this.journalThreadPriority = journalThreadPriority; + } protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {