activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r551271 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/ broker/region/ broker/region/cursors/ broker/region/policy/ command/ jndi/ kaha/impl/async/ kaha/impl/container/ kaha/impl/data/ kaha/impl/index/hash/ ...
Date Wed, 27 Jun 2007 18:53:33 GMT
Author: rajdavies
Date: Wed Jun 27 11:53:30 2007
New Revision: 551271

URL: http://svn.apache.org/viewvc?view=rev&rev=551271
Log:
ensure member variables are always synchronized 

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Wed Jun 27 11:53:30 2007
@@ -591,7 +591,7 @@
         props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
         props.setProperty("statsEnabled",Boolean.toString(isStatsEnabled()));
         props.setProperty("alwaysSyncSend",Boolean.toString(isAlwaysSyncSend()));
-        props.setProperty("producerWindowSize", Integer.toString(producerWindowSize));
+        props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
     }
 
     public boolean isUseCompression() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Jun 27 11:53:30 2007
@@ -136,7 +136,6 @@
     private transient Thread shutdownHook;
     private String[] transportConnectorURIs;
     private String[] networkConnectorURIs;
-    private String[] proxyConnectorURIs;
     private JmsConnector[] jmsBridgeConnectors; //these are Jms to Jms bridges to other jms messaging systems
     private boolean deleteAllMessagesOnStartup;
     private boolean advisorySupport = true;
@@ -1126,13 +1125,7 @@
                 addNetworkConnector(uri);
             }
         }
-        if (proxyConnectorURIs != null) {
-            for (int i = 0; i < proxyConnectorURIs.length; i++) {
-                String uri = proxyConnectorURIs[i];
-                addProxyConnector(uri);
-            }
-        }
-        
+               
         if (jmsBridgeConnectors != null){
             for (int i = 0; i < jmsBridgeConnectors.length; i++){
                 addJmsConnector(jmsBridgeConnectors[i]);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Jun 27 11:53:30 2007
@@ -349,7 +349,7 @@
         return null;
     }
 
-    public Response processWireFormat(WireFormatInfo info) throws Exception{
+    public synchronized Response processWireFormat(WireFormatInfo info) throws Exception{
         wireFormatInfo=info;
     	protocolVersion.set(info.getVersion());
         return null;
@@ -370,6 +370,9 @@
         if(cs!=null){
             context=cs.getContext();
         }
+        if (cs == null) {
+            throw new NullPointerException("Context is null");
+        }
         // Avoid replaying dup commands
         if(cs.getTransactionState(info.getTransactionId())==null){
             cs.addTransactionState(info.getTransactionId());
@@ -391,6 +394,9 @@
         if(cs!=null){
             context=cs.getContext();
         }
+        if (cs == null) {
+            throw new NullPointerException("Context is null");
+        }
         TransactionState transactionState=cs.getTransactionState(info.getTransactionId());
         if(transactionState==null)
             throw new IllegalStateException("Cannot prepare a transaction that had not been started: "
@@ -414,6 +420,9 @@
         if(cs!=null){
             context=cs.getContext();
         }
+        if (cs == null) {
+            throw new NullPointerException("Context is null");
+        }
         cs.removeTransactionState(info.getTransactionId());
         broker.commitTransaction(context,info.getTransactionId(),true);
         return null;
@@ -425,6 +434,9 @@
         if(cs!=null){
             context=cs.getContext();
         }
+        if (cs == null) {
+            throw new NullPointerException("Context is null");
+        }
         cs.removeTransactionState(info.getTransactionId());
         broker.commitTransaction(context,info.getTransactionId(),false);
         return null;
@@ -436,6 +448,9 @@
         if(cs!=null){
             context=cs.getContext();
         }
+        if (cs == null) {
+            throw new NullPointerException("Context is null");
+        }
         cs.removeTransactionState(info.getTransactionId());
         broker.rollbackTransaction(context,info.getTransactionId());
         return null;
@@ -869,17 +884,20 @@
             log.debug("Stopping connection: "+transport.getRemoteAddress());
             connector.onStopped(this);
             try{
-                if(masterBroker!=null){
-                    masterBroker.stop();
-                }
-                if (duplexBridge != null) {
-                    duplexBridge.stop();
-                }
-                // If the transport has not failed yet,
-                // notify the peer that we are doing a normal shutdown.
-                if(transportException==null){
-                    transport.oneway(new ShutdownInfo());
+                synchronized(this){
+                    if(masterBroker!=null){
+                        masterBroker.stop();
+                    }
+                    if(duplexBridge!=null){
+                        duplexBridge.stop();
+                    }
+                    // If the transport has not failed yet,
+                    // notify the peer that we are doing a normal shutdown.
+                    if(transportException==null){
+                        transport.oneway(new ShutdownInfo());
+                    }
                 }
+                
             }catch(Exception ignore){
                 log.trace("Exception caught stopping",ignore);
             }
@@ -1069,7 +1087,7 @@
         this.pendingStop=pendingStop;
     }
 
-    public Response processBrokerInfo(BrokerInfo info){
+    public synchronized Response processBrokerInfo(BrokerInfo info){
         if(info.isSlaveBroker()){
             // stream messages from this broker (the master) to
             // the slave
@@ -1172,8 +1190,10 @@
             synchronized(consumerExchanges){
                 result=new ConsumerBrokerExchange();
                 ConnectionState state=lookupConnectionState(id);
-                context=state.getContext();
-                result.setConnectionContext(context);
+                synchronized(this){
+                    context=state.getContext();
+                    result.setConnectionContext(context);
+                }
                 SessionState ss=state.getSessionState(id.getParentId());
                 if(ss!=null){
                     ConsumerState cs=ss.getConsumerState(id);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Jun 27 11:53:30 2007
@@ -54,7 +54,7 @@
         return active;
     }
 
-    protected boolean isFull(){
+    protected synchronized boolean isFull(){
         return !active||super.isFull();
     }
 
@@ -113,25 +113,23 @@
                 topic.deactivate(context,this);
             }
         }
-        synchronized(dispatched){
-            for(Iterator iter=dispatched.iterator();iter.hasNext();){
-                // Mark the dispatched messages as redelivered for next time.
-                MessageReference node=(MessageReference)iter.next();
-                Integer count=(Integer)redeliveredMessages.get(node.getMessageId());
-                if(count!=null){
-                    redeliveredMessages.put(node.getMessageId(),Integer.valueOf(count.intValue()+1));
-                }else{
-                    redeliveredMessages.put(node.getMessageId(),Integer.valueOf(1));
-                }
-                if(keepDurableSubsActive){
-                    synchronized(pending){
-                        pending.addMessageFirst(node);
-                    }
-                }else{
-                    node.decrementReferenceCount();
+        for(Iterator iter=dispatched.iterator();iter.hasNext();){
+            // Mark the dispatched messages as redelivered for next time.
+            MessageReference node=(MessageReference)iter.next();
+            Integer count=(Integer)redeliveredMessages.get(node.getMessageId());
+            if(count!=null){
+                redeliveredMessages.put(node.getMessageId(),Integer.valueOf(count.intValue()+1));
+            }else{
+                redeliveredMessages.put(node.getMessageId(),Integer.valueOf(1));
+            }
+            if(keepDurableSubsActive){
+                synchronized(pending){
+                    pending.addMessageFirst(node);
                 }
-                iter.remove();
+            }else{
+                node.decrementReferenceCount();
             }
+            iter.remove();
         }
         if(!keepDurableSubsActive){
             synchronized(pending){
@@ -167,11 +165,11 @@
         super.add(node);
     }
 
-    protected void doAddRecoveredMessage(MessageReference message) throws Exception{
+    protected synchronized void doAddRecoveredMessage(MessageReference message) throws Exception{
         pending.addRecoveredMessage(message);
     }
 
-    public int getPendingQueueSize(){
+    public synchronized int getPendingQueueSize(){
         if(active||keepDurableSubsActive){
             return super.getPendingQueueSize();
         }
@@ -184,7 +182,7 @@
                 "You cannot dynamically change the selector for durable topic subscriptions");
     }
 
-    protected boolean canDispatch(MessageReference node){
+    protected synchronized boolean canDispatch(MessageReference node){
         return active;
     }
 
@@ -198,7 +196,7 @@
         return subscriptionKey.getSubscriptionName();
     }
 
-    public String toString(){
+    public synchronized String toString(){
         return "DurableTopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
                 +", total="+enqueueCounter+", pending="+getPendingQueueSize()+", dispatched="+dispatchCounter
                 +", inflight="+dispatched.size()+", prefetchExtension="+this.prefetchExtension;
@@ -215,7 +213,7 @@
     /**
      * Release any references that we are holding.
      */
-    public void destroy(){
+    public synchronized void destroy(){
         try{
             synchronized(pending){
                 pending.reset();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Wed Jun 27 11:53:30 2007
@@ -76,7 +76,7 @@
         
         this.referenceCount=1;
         message.incrementReferenceCount();     
-        this.cachedSize = message != null ? message.getSize() : 0;
+        this.cachedSize = message.getSize();
     }
     
     synchronized public Message getMessageHardRef() {
@@ -212,7 +212,7 @@
         return false;
     }
 
-    public int getSize(){
+    public synchronized int getSize(){
        Message msg = message;
        if (msg != null){
            return msg.getSize();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Jun 27 11:53:30 2007
@@ -104,7 +104,7 @@
      * Occurs when a pull times out. If nothing has been dispatched since the timeout was setup, then send the NULL
      * message.
      */
-    private synchronized void pullTimeout(long dispatchCounterBeforePull){
+    final synchronized void pullTimeout(long dispatchCounterBeforePull){
         if(dispatchCounterBeforePull==dispatchCounter){
             try{
                 add(QueueMessageReference.NULL_MESSAGE);
@@ -300,14 +300,14 @@
     /**
      * @return true when 60% or more room is left for dispatching messages
      */
-    public boolean isLowWaterMark(){
+    public synchronized boolean isLowWaterMark(){
         return (dispatched.size()-prefetchExtension)<=(info.getPrefetchSize()*.4);
     }
 
     /**
      * @return true when 10% or less room is left for dispatching messages
      */
-    public boolean isHighWaterMark(){
+    public synchronized boolean isHighWaterMark(){
         return (dispatched.size()-prefetchExtension)>=(info.getPrefetchSize()*.9);
     }
 
@@ -315,16 +315,12 @@
         return info.getPrefetchSize()+prefetchExtension-dispatched.size();
     }
 
-    public int getPendingQueueSize(){
-        synchronized(pending){
-            return pending.size();
-        }
+    public synchronized int getPendingQueueSize(){
+        return pending.size();
     }
 
-    public int getDispatchedQueueSize(){
-        synchronized(dispatched){
-            return dispatched.size();
-        }
+    public synchronized int getDispatchedQueueSize(){
+        return dispatched.size();
     }
 
     synchronized public long getDequeueCounter(){
@@ -344,11 +340,11 @@
     }
     
    
-    public PendingMessageCursor getPending(){
+    public synchronized PendingMessageCursor getPending(){
         return this.pending;
     }
 
-    public void setPending(PendingMessageCursor pending){
+    public synchronized void setPending(PendingMessageCursor pending){
         this.pending=pending;
     }
     
@@ -413,7 +409,7 @@
         }
     }
 
-    protected boolean dispatch(final MessageReference node) throws IOException{
+    protected synchronized boolean dispatch(final MessageReference node) throws IOException{
         final Message message=node.getMessage();
         if(message==null){
             return false;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Jun 27 11:53:30 2007
@@ -47,7 +47,6 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.kaha.Store;
@@ -421,7 +420,7 @@
         doMessageSend(producerExchange, message);
     }
 
-	private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
+	void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
 		final ConnectionContext context = producerExchange.getConnectionContext();
 		message.setRegionDestination(this);
         if(store!=null&&message.isPersistent()){
@@ -979,7 +978,7 @@
     }
     
       
-    private void sendMessage(final ConnectionContext context,Message msg) throws Exception{
+    final void sendMessage(final ConnectionContext context,Message msg) throws Exception{
         synchronized(messages){
             messages.addMessageLast(msg);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Wed Jun 27 11:53:30 2007
@@ -39,7 +39,7 @@
         return !((QueueMessageReference)node).isAcked();
     }
     
-    public String toString() {
+    public synchronized String toString() {
         return 
             "QueueBrowserSubscription:" +
             " consumer="+info.getConsumerId()+

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Wed Jun 27 11:53:30 2007
@@ -139,7 +139,7 @@
         }
     }
     
-    public String toString() {
+    public synchronized String toString() {
         return 
             "QueueSubscription:" +
             " consumer="+info.getConsumerId()+

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Wed Jun 27 11:53:30 2007
@@ -341,7 +341,7 @@
         doMessageSend(producerExchange, message);
     }
 
-	private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
+	void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
 		final ConnectionContext context = producerExchange.getConnectionContext();
 		message.setRegionDestination(this);
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Wed Jun 27 11:53:30 2007
@@ -166,7 +166,7 @@
         nonPersistent.addMessageLast(node);
     }
 
-    public void clear(){
+    public synchronized void clear(){
         pendingCount=0;
         nonPersistent.clear();
         for(PendingMessageCursor tsp: storePrefetches){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Wed Jun 27 11:53:30 2007
@@ -90,7 +90,7 @@
         }
     }
     
-    public void addMessageFirst(MessageReference node) throws Exception{
+    public synchronized void addMessageFirst(MessageReference node) throws Exception{
         if(node!=null){
             Message msg=node.getMessage();
             if(started){
@@ -105,7 +105,7 @@
         }
     }
 
-    public void clear(){
+    public synchronized void clear(){
         pendingCount=0;
     }
 
@@ -150,7 +150,7 @@
         persistent.reset();
     }
 
-    public int size(){
+    public synchronized int size(){
         return pendingCount;
     }
 
@@ -165,25 +165,25 @@
      * @see org.apache.activemq.region.cursors.PendingMessageCursor
      * @return true if recovery required
      */
-    public boolean isRecoveryRequired(){
+    public synchronized boolean isRecoveryRequired(){
         return false;
     }
 
     /**
      * @return the nonPersistent Cursor
      */
-    public PendingMessageCursor getNonPersistent(){
+    public synchronized PendingMessageCursor getNonPersistent(){
         return this.nonPersistent;
     }
 
     /**
      * @param nonPersistent cursor to set
      */
-    public void setNonPersistent(PendingMessageCursor nonPersistent){
+    public synchronized void setNonPersistent(PendingMessageCursor nonPersistent){
         this.nonPersistent=nonPersistent;
     }
 
-    public void setMaxBatchSize(int maxBatchSize){
+    public synchronized void setMaxBatchSize(int maxBatchSize){
         persistent.setMaxBatchSize(maxBatchSize);
         if(nonPersistent!=null){
             nonPersistent.setMaxBatchSize(maxBatchSize);
@@ -191,7 +191,7 @@
         super.setMaxBatchSize(maxBatchSize);
     }
     
-    public void gc() {
+    public synchronized void gc() {
         if (persistent != null) {
             persistent.gc();
         }
@@ -200,7 +200,7 @@
         }
     }
     
-    public void setUsageManager(UsageManager usageManager){
+    public synchronized void setUsageManager(UsageManager usageManager){
         super.setUsageManager(usageManager);
         if (persistent != null) {
             persistent.setUsageManager(usageManager);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Wed Jun 27 11:53:30 2007
@@ -80,7 +80,7 @@
     /**
      * @return true if there are no pendingCount messages
      */
-    public boolean isEmpty(){
+    public synchronized boolean isEmpty(){
         return pendingCount <= 0;
     }
 
@@ -99,7 +99,7 @@
         }
     }
     
-    public void addMessageFirst(MessageReference node) throws Exception{
+    public synchronized void addMessageFirst(MessageReference node) throws Exception{
         if(node!=null){
             if(started){
                 firstMessageId=node.getMessageId();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java Wed Jun 27 11:53:30 2007
@@ -92,7 +92,7 @@
         this.maximumSize=maximumSize;
     }
 
-    public Message[] browse(ActiveMQDestination destination) throws Exception{
+    public synchronized Message[] browse(ActiveMQDestination destination) throws Exception{
         List result=new ArrayList();
         DestinationFilter filter=DestinationFilter.parseFilter(destination);
         int t=tail;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Wed Jun 27 11:53:30 2007
@@ -90,7 +90,7 @@
         return IntrospectionSupport.toString(this);
     }
     
-    public int hasCode() {
+    public int hashCode() {
         int h1 = clientId != null ? clientId.hashCode():-1;
         int h2 = subcriptionName != null ? subcriptionName.hashCode():-1;
         return h1 ^ h2;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java Wed Jun 27 11:53:30 2007
@@ -389,6 +389,7 @@
     }
 
     private class ListEnumeration extends LocalNamingEnumeration {
+        ListEnumeration(){}
         public Object next() throws NamingException {
             return nextElement();
         }
@@ -400,6 +401,8 @@
     }
 
     private class ListBindingEnumeration extends LocalNamingEnumeration {
+        ListBindingEnumeration(){
+        }
         public Object next() throws NamingException {
             return nextElement();
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Wed Jun 27 11:53:30 2007
@@ -61,8 +61,8 @@
     
     public static final int ITEM_HEAD_FOOT_SPACE=ITEM_HEAD_SPACE+ITEM_FOOT_SPACE;
 
-    public static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; // 
-    public static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; // 
+    static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; // 
+    static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; // 
     
     public static final byte DATA_ITEM_TYPE=1;
     public static final byte REDO_ITEM_TYPE=2;
@@ -71,8 +71,8 @@
     public static final String DEFAULT_FILE_PREFIX="data-";
     public static final int DEFAULT_MAX_FILE_LENGTH=1024*1024*32;
     
-    private File directory = new File(DEFAULT_DIRECTORY);
-    private String filePrefix=DEFAULT_FILE_PREFIX;
+    File directory = new File(DEFAULT_DIRECTORY);
+    String filePrefix=DEFAULT_FILE_PREFIX;
     private int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
     private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH-1024*512;
     
@@ -101,8 +101,10 @@
     	
     	started=true;
     	directory.mkdirs();
-    	controlFile = new ControlFile(new File(directory, filePrefix+"control"), CONTROL_RECORD_MAX_LENGTH);
-    	controlFile.lock();
+        synchronized(this){
+            controlFile=new ControlFile(new File(directory,filePrefix+"control"),CONTROL_RECORD_MAX_LENGTH);
+            controlFile.lock();
+        }
     	
     	ByteSequence sequence = controlFile.load();
     	if( sequence != null && sequence.getLength()>0 ) {
@@ -116,7 +118,7 @@
 
         File[] files=directory.listFiles(new FilenameFilter(){
             public boolean accept(File dir,String n){
-                return dir.equals(dir)&&n.startsWith(filePrefix);
+                return dir.equals(directory)&&n.startsWith(filePrefix);
             }
         });
         
@@ -252,8 +254,8 @@
     }
 
     DataFile getDataFile(Location item) throws IOException{
-        Integer key=new Integer(item.getDataFileId());
-        DataFile dataFile=(DataFile) fileMap.get(key);
+        Integer key= Integer.valueOf(item.getDataFileId());
+        DataFile dataFile=fileMap.get(key);
         if(dataFile==null){
             log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
             throw new IOException("Could not locate data file "+filePrefix+"-"+item.getDataFileId());
@@ -265,14 +267,12 @@
 		return (DataFile) dataFile.getNext();
 	}
 
-    public void close() throws IOException{
-        synchronized(this){
-            if(!started){
-                return;
-            }
-            Scheduler.cancel(cleanupTask);
-            accessorPool.close();
+    public synchronized void close() throws IOException{
+        if(!started){
+            return;
         }
+        Scheduler.cancel(cleanupTask);
+        accessorPool.close();
         storeState(false);
         appender.close();
         fileMap.clear();
@@ -281,7 +281,7 @@
         started=false;
     }
 
-	private synchronized void cleanup() {
+	synchronized void cleanup() {
 		if( accessorPool!=null ) {
 			accessorPool.disposeUnused();
 		}
@@ -375,7 +375,7 @@
 		}
     }
 
-    private void removeDataFile(DataFile dataFile) throws IOException{
+    private synchronized void removeDataFile(DataFile dataFile) throws IOException{
 
     	// Make sure we don't delete too much data.
         if( dataFile==currentWriteFile || mark==null || dataFile.getDataFileId() >= mark.getDataFileId() ) {
@@ -414,7 +414,7 @@
 		return mark;
 	}
 
-	public Location getNextLocation(Location location) throws IOException, IllegalStateException {
+	public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
 			
 			
 			Location cur = null;
@@ -492,17 +492,17 @@
 		storeState(sync);
 	}
 
-	private void storeState(boolean sync) throws IOException {
-		ByteSequence state = marshallState();
-		appender.storeItem(state, Location.MARK_TYPE, sync);
-		controlFile.store(state, sync);
-	}
+	private synchronized void storeState(boolean sync) throws IOException{
+        ByteSequence state=marshallState();
+        appender.storeItem(state,Location.MARK_TYPE,sync);
+        controlFile.store(state,sync);
+    }
 
-	public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
+	public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
         return appender.storeItem(data, Location.USER_TYPE, sync);
 	}
 	
-	public Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
+	public  synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
         return appender.storeItem(data, type, sync);
 	}
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java Wed Jun 27 11:53:30 2007
@@ -27,7 +27,7 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-class DataFile extends LinkedNode implements Comparable {
+class DataFile extends LinkedNode implements Comparable<DataFile> {
 	
     private final File file;
     private final Integer dataFileId;
@@ -39,7 +39,7 @@
     DataFile(File file, int number, int preferedSize){
         this.file=file;
 		this.preferedSize = preferedSize;
-        this.dataFileId=new Integer(number);
+        this.dataFileId=Integer.valueOf(number);
         length=(int)(file.exists()?file.length():0);
     }
 
@@ -98,10 +98,17 @@
         return file.delete();
     }
 
-	public int compareTo(Object o) {
-		DataFile df = (DataFile) o;
+	public int compareTo(DataFile df) {
 		return dataFileId - df.dataFileId;
 	}
+    
+    public boolean equals(Object o) {
+        boolean result = false;
+        if (o instanceof DataFile) {
+            result = compareTo((DataFile)o)==0;
+        }
+        return result;
+    }
 
 
 	

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java Wed Jun 27 11:53:30 2007
@@ -55,10 +55,13 @@
 	    	return hash;  
 	    }
 	    
-	    public boolean equals(Object obj) {
-	    	WriteKey di = (WriteKey)obj;
-	    	return di.file == file && di.offset == offset;
-	    }
+	    public boolean equals(Object obj){
+            if(obj instanceof WriteKey){
+                WriteKey di=(WriteKey)obj;
+                return di.file==file&&di.offset==offset;
+            }
+            return false;
+        }
 	}
 	
 	public class WriteBatch {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java Wed Jun 27 11:53:30 2007
@@ -131,5 +131,18 @@
 		}
 		return dataFileId - l.dataFileId;
 	}
+    
+    public boolean equals(Object o) {
+        boolean result = false;
+        if (o instanceof Location) {
+            result = compareTo((Location)o)==0;
+        }
+        return result;
+    }
+    
+    public int hashCode() {
+        return dataFileId ^ offset;
+    }
+
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java Wed Jun 27 11:53:30 2007
@@ -188,24 +188,26 @@
         }
     }
 
-    protected final void delete(IndexItem key,IndexItem prev,IndexItem next){
-        try{
-            dataManager.removeInterestInFile(key.getKeyFile());
-            dataManager.removeInterestInFile(key.getValueFile());
-            prev=prev==null?root:prev;
-            next=next!=root?next:null;
-            if(next!=null){
-                prev.setNextItem(next.getOffset());
-                next.setPreviousItem(prev.getOffset());
-                updateIndexes(next);
-            }else{
-                prev.setNextItem(Item.POSITION_NOT_SET);
+    protected final void delete(final IndexItem keyItem,final IndexItem prevItem,final IndexItem nextItem){
+        if(keyItem!=null){
+            try{
+                IndexItem prev=prevItem==null?root:prevItem;
+                IndexItem next=nextItem!=root?nextItem:null;
+                dataManager.removeInterestInFile(keyItem.getKeyFile());
+                dataManager.removeInterestInFile(keyItem.getValueFile());
+                if(next!=null){
+                    prev.setNextItem(next.getOffset());
+                    next.setPreviousItem(prev.getOffset());
+                    updateIndexes(next);
+                }else{
+                    prev.setNextItem(Item.POSITION_NOT_SET);
+                }
+                updateIndexes(prev);
+                indexManager.freeIndex(keyItem);
+            }catch(IOException e){
+                log.error("Failed to delete "+keyItem,e);
+                throw new RuntimeStoreException(e);
             }
-            updateIndexes(prev);
-            indexManager.freeIndex(key);
-        }catch(IOException e){
-            log.error("Failed to delete "+key,e);
-            throw new RuntimeStoreException(e);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java Wed Jun 27 11:53:30 2007
@@ -115,13 +115,14 @@
     }
     
     public String toString() {
-        String result ="ContainerKeySet[";
+        StringBuffer result =new StringBuffer(32);
+        result.append("ContainerKeySet[");
         IndexItem item = container.getInternalList().getRoot();
         while ((item = container.getInternalList().getNextEntry(item)) != null) {
-            result += container.getKey(item);
-            result += ",";
+            result.append(container.getKey(item));
+            result.append(",");
         }
-        result +="]";
-        return result;
+        result.append("]");
+        return result.toString();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Wed Jun 27 11:53:30 2007
@@ -121,6 +121,10 @@
         }
         return result;
     }
+    
+    public int hashCode() {
+        return super.hashCode();
+    }
 
     /*
      * (non-Javadoc)
@@ -158,13 +162,14 @@
     public synchronized Object removeFirst(){
         load();
         Object result=null;
-        IndexItem item=(IndexItem)indexList.getFirst();
+        IndexItem item=indexList.getFirst();
         if(item!=null){
             itemRemoved(0);
             result=getValue(item);
             IndexItem prev=root;
             IndexItem next=indexList.size()>1?(IndexItem)indexList.get(1):null;
             indexList.removeFirst();
+            
             delete(item,prev,next);
             item=null;
         }
@@ -306,6 +311,7 @@
         IndexItem prev=indexList.getPrevEntry(item);
         IndexItem next=indexList.getNextEntry(item);
         indexList.remove(item);
+        
         delete(item,prev,next);
     }
 
@@ -591,7 +597,6 @@
      */
     public synchronized ListIterator listIterator(){
         load();
-        IndexItem start= indexList.getFirst();
         return new ContainerListIterator(this,indexList,indexList.getRoot());
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Wed Jun 27 11:53:30 2007
@@ -497,13 +497,12 @@
     protected synchronized IndexItem write(Object key,Object value){
         IndexItem index=null;
         try{
-            if(key!=null){
-                index=indexManager.createNewIndex();
-                StoreLocation data=dataManager.storeDataItem(keyMarshaller,key);
-                index.setKeyData(data);
-            }
+            index=indexManager.createNewIndex();
+            StoreLocation data=dataManager.storeDataItem(keyMarshaller,key);
+            index.setKeyData(data);
+            
             if(value!=null){
-                StoreLocation data=dataManager.storeDataItem(valueMarshaller,value);
+                data=dataManager.storeDataItem(valueMarshaller,value);
                 index.setValueData(data);
             }
             IndexItem prev=indexList.getLast();           

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java Wed Jun 27 11:53:30 2007
@@ -38,7 +38,7 @@
 
     DataFile(File file,int number){
         this.file=file;
-        this.number=new Integer(number);
+        this.number=Integer.valueOf(number);
         length=file.exists()?file.length():0;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java Wed Jun 27 11:53:30 2007
@@ -43,7 +43,7 @@
     private static final Log log=LogFactory.getLog(DataManagerImpl.class);
     public static final long MAX_FILE_LENGTH=1024*1024*32;
     private static final String NAME_PREFIX="data-";
-    private final File dir;
+    private final File directory;
     private final String name;
     private SyncDataFileReader reader;
     private SyncDataFileWriter writer;
@@ -59,14 +59,14 @@
     private String dataFilePrefix;
    
     public DataManagerImpl(File dir, final String name){
-        this.dir=dir;
+        this.directory=dir;
         this.name=name;
         
         dataFilePrefix = NAME_PREFIX+name+"-";
         // build up list of current dataFiles
         File[] files=dir.listFiles(new FilenameFilter(){
             public boolean accept(File dir,String n){
-                return dir.equals(dir)&&n.startsWith(dataFilePrefix);
+                return dir.equals(directory)&&n.startsWith(dataFilePrefix);
             }
         });
         if(files!=null){
@@ -86,7 +86,7 @@
     
     private DataFile createAndAddDataFile(int num){
         String fileName=dataFilePrefix+num;
-        File file=new File(dir,fileName);
+        File file=new File(directory,fileName);
         DataFile result=new DataFile(file,num);
         fileMap.put(result.getNumber(),result);
         return result;
@@ -114,7 +114,7 @@
     }
 
     DataFile getDataFile(StoreLocation item) throws IOException{
-        Integer key=new Integer(item.getFile());
+        Integer key=Integer.valueOf(item.getFile());
         DataFile dataFile=(DataFile) fileMap.get(key);
         if(dataFile==null){
             log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java Wed Jun 27 11:53:30 2007
@@ -297,12 +297,8 @@
         }
     }
 
-    private void doUnderFlow(int index) {
-        int pageNo = index / maximumEntries;
-        int nextPageNo = pageNo + 1;
-        if (nextPageNo < hashPages.size()) {
-        }
-        HashPageInfo info = hashPages.get(pageNo);
+    private void doUnderFlow(@SuppressWarnings("unused") int index) {
+        //does little
     }
 
     private void end() throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java Wed Jun 27 11:53:30 2007
@@ -43,7 +43,7 @@
         return compareTo(o)==0;
     }
 
-    public int hasCode(){
+    public int hashCode(){
         return key.hashCode();
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java Wed Jun 27 11:53:30 2007
@@ -260,7 +260,7 @@
     public synchronized void delete() throws IOException{
         unload();
         if(file.exists()){
-            boolean result=file.delete();
+            file.delete();
         }
         length=0;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java Wed Jun 27 11:53:30 2007
@@ -225,9 +225,12 @@
 
     void dump() {
 
-        String str = this + ": ";
+        StringBuffer str = new StringBuffer(32);
+        str.append(toString());
+        str.append(": ");
         for (HashEntry entry : hashIndexEntries) {
-            str += entry + ",";
+            str .append(entry);
+            str.append(",");
         }
         log.info(str);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java Wed Jun 27 11:53:30 2007
@@ -45,7 +45,7 @@
         return compareTo(o)==0;
     }
 
-    public int hasCode(){
+    public int hashCode(){
         return key.hashCode();
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java Wed Jun 27 11:53:30 2007
@@ -50,7 +50,7 @@
         }, "Cache Evictor: "+System.identityHashCode(this));
     }
     
-    private boolean evictMessages() {
+    boolean evictMessages() {
         // Try to take the memory usage down below the low mark.
         try {            
             log.debug("Evicting cache memory usage: "+usageManager.getPercentUsage());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Wed Jun 27 11:53:30 2007
@@ -125,7 +125,7 @@
     /**
      * @throws IOException
      */
-    private void startBridge() throws IOException {
+    final void startBridge() throws IOException {
         connectionInfo = new ConnectionInfo();
         connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
         connectionInfo.setClientId(clientId);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java Wed Jun 27 11:53:30 2007
@@ -196,7 +196,7 @@
     protected class Synchronization implements javax.transaction.Synchronization {
         private final PooledSession session;
 
-        private Synchronization(PooledSession session) {
+        protected Synchronization(PooledSession session) {
             this.session = session;
         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Wed Jun 27 11:53:30 2007
@@ -168,18 +168,22 @@
         }
     }
 
-    public Response processAddDestination(DestinationInfo info)  {
-        ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
-        if( cs != null && info != null && info.getDestination().isTemporary() ) {
-            cs.addTempDestination(info);
+    public Response processAddDestination(DestinationInfo info){
+        if(info!=null){
+            ConnectionState cs=(ConnectionState)connectionStates.get(info.getConnectionId());
+            if(cs!=null&&info.getDestination().isTemporary()){
+                cs.addTempDestination(info);
+            }
         }
         return TRACKED_RESPONSE_MARKER;
     }
 
-    public Response processRemoveDestination(DestinationInfo info)  {
-        ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
-        if( cs != null && info != null && info.getDestination().isTemporary() ) {
-            cs.removeTempDestination(info.getDestination());
+    public Response processRemoveDestination(DestinationInfo info){
+        if(info!=null){
+            ConnectionState cs=(ConnectionState)connectionStates.get(info.getConnectionId());
+            if(cs!=null&&info.getDestination().isTemporary()){
+                cs.removeTempDestination(info.getDestination());
+            }
         }
         return TRACKED_RESPONSE_MARKER;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Wed Jun 27 11:53:30 2007
@@ -130,7 +130,7 @@
         }
     }
 
-    private void addMessage(final Message message,final Location location) throws InterruptedIOException{
+    void addMessage(final Message message,final Location location) throws InterruptedIOException{
         ReferenceData data=new ReferenceData();
         data.setExpiration(message.getExpiration());
         data.setFileId(location.getDataFileId());
@@ -205,7 +205,7 @@
         }
     }
 
-    private void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{
+    final void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{
         ReferenceData data;
         synchronized(this){
             lastLocation=location;
@@ -273,7 +273,7 @@
      * @return
      * @throws IOException
      */
-    private void asyncWrite(){
+    void asyncWrite(){
         try{
             CountDownLatch countDown;
             synchronized(this){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Jun 27 11:53:30 2007
@@ -218,8 +218,10 @@
         if(!started.compareAndSet(true,false))
             return;
         this.usageManager.removeUsageListener(this);
-        Scheduler.cancel(periodicCheckpointTask);
-        Scheduler.cancel(periodicCleanupTask);
+        synchronized(this){
+            Scheduler.cancel(periodicCheckpointTask);
+            Scheduler.cancel(periodicCleanupTask);
+        }
         Iterator<AMQMessageStore> iterator=queues.values().iterator();
         while(iterator.hasNext()){
             AMQMessageStore ms=iterator.next();
@@ -232,7 +234,9 @@
         }
         // Take one final checkpoint and stop checkpoint processing.
         checkpoint(true);
-        checkpointTask.shutdown();
+        synchronized(this){
+            checkpointTask.shutdown();
+        }
         queues.clear();
         topics.clear();
         IOException firstException=null;
@@ -259,8 +263,8 @@
             CountDownLatch latch=null;
             synchronized(this){
                 latch=nextCheckpointCountDownLatch;
+                checkpointTask.wakeup();
             }
-            checkpointTask.wakeup();
             if(sync){
                 if(log.isDebugEnabled()){
                     log.debug("Waitng for checkpoint to complete.");
@@ -585,7 +589,7 @@
         return transactionStore;
     }
 
-    public void deleteAllMessages() throws IOException{
+    public synchronized void deleteAllMessages() throws IOException{
         deleteAllMessages=true;
     }
 
@@ -669,11 +673,11 @@
         this.maxCheckpointWorkers=maxCheckpointWorkers;
     }
 
-    public File getDirectory(){
+    public synchronized File getDirectory(){
         return directory;
     }
 
-    public void setDirectory(File directory){
+    public synchronized void setDirectory(File directory){
         this.directory=directory;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Wed Jun 27 11:53:30 2007
@@ -143,7 +143,7 @@
      * @param key
      * @throws InterruptedIOException
      */
-    private void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException{
+    protected void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException{
         synchronized(this){
             lastLocation=location;
             ackedLastAckLocations.put(key,messageId);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Wed Jun 27 11:53:30 2007
@@ -125,7 +125,7 @@
         }
     }
 
-    private void addMessage(final Message message, final RecordLocation location) {
+    void addMessage(final Message message, final RecordLocation location) {
         synchronized (this) {
             lastLocation = location;
             MessageId id = message.getMessageId();
@@ -187,7 +187,7 @@
         }
     }
     
-    private void removeMessage(final MessageAck ack, final RecordLocation location) {
+    final void removeMessage(final MessageAck ack, final RecordLocation location) {
         synchronized (this) {
             lastLocation = location;
             MessageId id = ack.getLastMessageId();
@@ -253,33 +253,31 @@
                 ConnectionContext context = transactionTemplate.getContext();
                 
                 // Checkpoint the added messages.
-                Iterator iterator = cpAddedMessageIds.values().iterator();
-                while (iterator.hasNext()) {
-                    Message message = (Message) iterator.next();
-                    try {
-                        longTermStore.addMessage(context, message);
-                    } catch (Throwable e) {
-                        log.warn("Message could not be added to long term store: " + e.getMessage(), e);
+                synchronized(JournalMessageStore.this){
+                    Iterator iterator=cpAddedMessageIds.values().iterator();
+                    while(iterator.hasNext()){
+                        Message message=(Message)iterator.next();
+                        try{
+                            longTermStore.addMessage(context,message);
+                        }catch(Throwable e){
+                            log.warn("Message could not be added to long term store: "+e.getMessage(),e);
+                        }
+                        size+=message.getSize();
+                        message.decrementReferenceCount();
+                        // Commit the batch if it's getting too big
+                        if(size>=maxCheckpointMessageAddSize){
+                            persitanceAdapter.commitTransaction(context);
+                            persitanceAdapter.beginTransaction(context);
+                            size=0;
+                        }
                     }
-                    
-                    size += message.getSize();
-                    
-                    message.decrementReferenceCount();
-                    
-                    // Commit the batch if it's getting too big
-                    if( size >= maxCheckpointMessageAddSize ) {
-                        persitanceAdapter.commitTransaction(context);
-                        persitanceAdapter.beginTransaction(context);
-                        size=0;
-                    }
-                    
                 }
 
                 persitanceAdapter.commitTransaction(context);
                 persitanceAdapter.beginTransaction(context);
 
                 // Checkpoint the removed messages.
-                iterator = cpRemovedMessageLocations.iterator();
+                Iterator iterator = cpRemovedMessageLocations.iterator();
                 while (iterator.hasNext()) {
                     try {
                         MessageAck ack = (MessageAck) iterator.next();
@@ -303,7 +301,8 @@
         if( cpActiveJournalLocations.size() > 0 ) {
             Collections.sort(cpActiveJournalLocations);
             return (RecordLocation) cpActiveJournalLocations.get(0);
-        } else {
+        }
+        synchronized (this){
             return lastLocation;
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Wed Jun 27 11:53:30 2007
@@ -92,8 +92,8 @@
     private final ConcurrentHashMap topics = new ConcurrentHashMap();
     
     private UsageManager usageManager;
-    private long checkpointInterval = 1000 * 60 * 5;
-    private long lastCheckpointRequest = System.currentTimeMillis();
+    long checkpointInterval = 1000 * 60 * 5;
+    long lastCheckpointRequest = System.currentTimeMillis();
     private long lastCleanup = System.currentTimeMillis();
     private int maxCheckpointWorkers = 10;
     private int maxCheckpointMessageAddSize = 1024*1024;
@@ -112,7 +112,11 @@
     final Runnable createPeriodicCheckpointTask() {
     	return new Runnable() {
 	        public void run() {
-	            if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
+                long lastTime = 0;
+                synchronized(this) {
+                    lastTime = lastCheckpointRequest;
+                }
+	            if( System.currentTimeMillis()>lastTime+checkpointInterval ) {
 	                checkpoint(false, true);
 	            }
 	        }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Wed Jun 27 11:53:30 2007
@@ -142,7 +142,7 @@
      * @param location
      * @param key
      */
-    private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
+    protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
         synchronized(this) {
 		    lastLocation = location;
 		    ackedLastAckLocations.put(key, messageId);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Wed Jun 27 11:53:30 2007
@@ -174,7 +174,7 @@
      * @param nextToDispatch
      * @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
      */
-    public void resetBatching(){
+    public synchronized void resetBatching(){
         batchEntry=null;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Wed Jun 27 11:53:30 2007
@@ -149,7 +149,7 @@
         messageContainer.clear();
     }
 
-    public void resetBatching(){
+    public synchronized void resetBatching(){
         batchEntry=null;
         lastBatchId=null;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Jun 27 11:53:30 2007
@@ -212,7 +212,7 @@
     }
     
     @Override
-    public void setDirectory(File directory){
+    public synchronized void setDirectory(File directory){
         File file = new File(directory,"data");
         super.setDirectory(file);
         this.stateStore=createStateStore(directory);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java Wed Jun 27 11:53:30 2007
@@ -138,7 +138,7 @@
      * @param ack
      * @throws IOException
      */
-    private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException{
+    final void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException{
         if(ack.isInTransaction()){
             KahaTransaction tx=getOrCreateTx(ack.getTransactionId());
             tx.add((KahaMessageStore) destination,ack);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Wed Jun 27 11:53:30 2007
@@ -27,8 +27,7 @@
 public class TopicSubContainer {
     private transient ListContainer listContainer;
     private transient StoreEntry batchEntry;
-    private transient String lastBatchId;
-
+    
     public TopicSubContainer(ListContainer container) {
         this.listContainer = container;
     }
@@ -45,12 +44,10 @@
      * @param batchEntry the batchEntry to set
      */
     public void setBatchEntry(String id,StoreEntry batchEntry) {
-        this.lastBatchId=id;
         this.batchEntry = batchEntry;
     }
 
     public void reset() {
-        lastBatchId=null;
         batchEntry = null;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Wed Jun 27 11:53:30 2007
@@ -229,7 +229,7 @@
      * @param ack
      * @throws IOException
      */
-    private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException {
+    final void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException {
         if( doingRecover )
             return;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java Wed Jun 27 11:53:30 2007
@@ -82,7 +82,7 @@
         shutdown(0);
     }
     
-    private void runTask() {
+    final void runTask() {
         
         try {
             while( true ) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java Wed Jun 27 11:53:30 2007
@@ -96,7 +96,7 @@
     public void shutdown() throws InterruptedException {
         shutdown(0);
     }
-    private void runTask() {
+    final void runTask() {
         
         synchronized (runable) {
             queued = false;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Wed Jun 27 11:53:30 2007
@@ -68,7 +68,7 @@
     }
 
 
-    private void writeCheck() {
+    final void writeCheck() {
         synchronized(writeChecker) {
             if( inSend.get() ) {
                 log.trace("A send is in progress");
@@ -90,7 +90,7 @@
         }
     }
 
-    private void readCheck() {
+    final void readCheck() {
         synchronized(readChecker) {
             if( inReceive.get() ) {
                 log.trace("A receive is in progress");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java Wed Jun 27 11:53:30 2007
@@ -60,8 +60,6 @@
     private String group = "default";
     private final CopyOnWriteArrayList serviceInfos = new CopyOnWriteArrayList();
 
-    private String brokerName;
-
     // DiscoveryAgent interface
     //-------------------------------------------------------------------------
     public void start() throws Exception {
@@ -232,11 +230,16 @@
         return "_" + group+"."+TYPE_SUFFIX;
     }
 
-    public void setBrokerName(String brokerName) {
-        this.brokerName = brokerName;        
-    }
-
     public void serviceFailed(DiscoveryEvent event) throws IOException {
         // TODO: is there a way to notify the JmDNS that the service failed?
+    }
+
+    /**
+     * @param brokerName
+     * @see org.apache.activemq.transport.discovery.DiscoveryAgent#setBrokerName(java.lang.String)
+     */
+    public void setBrokerName(String brokerName){
+        // implementation of interface
+        
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Jun 27 11:53:30 2007
@@ -91,7 +91,7 @@
 	                return;
 	            }
 	            if (command.isResponse()) {
-                    Object object = requestMap.remove(new Integer(((Response) command).getCorrelationId()));
+                    Object object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
                     if( object!=null && object.getClass() == Tracked.class ) {
                 	   ((Tracked)object).onResponses();
                     }
@@ -231,7 +231,7 @@
         }, "ActiveMQ Failover Worker: "+System.identityHashCode(this));
     }
 
-    private void handleTransportFailure(IOException e) throws InterruptedException {
+    final void handleTransportFailure(IOException e) throws InterruptedException {
         if (transportListener != null){
             transportListener.transportInterupted();
         }
@@ -382,9 +382,9 @@
                         // it later.
                         Tracked tracked = stateTracker.track(command);
                         if( tracked!=null && tracked.isWaitingForResponse() ) {
-                            requestMap.put(new Integer(command.getCommandId()), tracked);
+                            requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
                         } else if ( tracked==null && command.isResponseRequired()) {
-                            requestMap.put(new Integer(command.getCommandId()), command);
+                            requestMap.put(Integer.valueOf(command.getCommandId()), command);
                         }
                                                 
                         // Send the message.
@@ -398,7 +398,7 @@
                         		// since we will retry in this method.. take it out of the request
                         		// map so that it is not sent 2 times on recovery
                             	if( command.isResponseRequired() ) {
-                            		requestMap.remove(new Integer(command.getCommandId()));
+                            		requestMap.remove(Integer.valueOf(command.getCommandId()));
                             	}
                             	
                                 // Rethrow the exception so it will handled by the outer catch

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java Wed Jun 27 11:53:30 2007
@@ -19,6 +19,7 @@
 
 import javax.jms.Message;
 
+import java.io.Serializable;
 import java.util.Comparator;
 
 /**
@@ -26,7 +27,7 @@
  * 
  * @version $Revision$
  */
-public abstract class MessageComparatorSupport implements Comparator {
+public abstract class MessageComparatorSupport implements Comparator, Serializable {
 
     public int compare(Object object1, Object object2) {
         Message command1 = (Message) object1;
@@ -36,11 +37,20 @@
 
     protected abstract int compareMessages(Message message1, Message message2);
 
-    protected int compareComparators(Comparable comparable, Comparable comparable2) {
-        if (comparable != null) {
+    protected int compareComparators(final Comparable comparable, final Comparable comparable2) {
+        if (comparable == null && comparable2 == null) {
+            return 0;
+        }
+        else if (comparable != null) {
+            if (comparable2== null) {
+                return 1;
+            }
             return comparable.compareTo(comparable2);
         }
         else if (comparable2 != null) {
+            if (comparable== null) {
+                return -11;
+            }
             return comparable2.compareTo(comparable) * -1;
         }
         return 0;



Mime
View raw message