activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r552738 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/ft/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/state/ main/java/org/apache/activemq/t...
Date Tue, 03 Jul 2007 08:31:12 GMT
Author: rajdavies
Date: Tue Jul  3 01:31:10 2007
New Revision: 552738

URL: http://svn.apache.org/viewvc?view=rev&rev=552738
Log:
Added duplicate detection to the TransactionBroker - so can cope with rollbacks etc.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Tue
Jul  3 01:31:10 2007
@@ -250,4 +250,8 @@
      * @return the URI that can be used to connect to the local Broker
      */
     public URI getVmConnectorURI();
+    
+    public void brokerServiceStarted();
+    
+    BrokerService getBrokerService();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Tue Jul  3 01:31:10 2007
@@ -242,5 +242,12 @@
     public URI getVmConnectorURI(){
         return next.getVmConnectorURI();
     }
+
+    public void brokerServiceStarted(){
+        next.brokerServiceStarted();
+    }
     
+    public BrokerService getBrokerService(){
+        return next.getBrokerService();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Tue Jul  3 01:31:10 2007
@@ -153,6 +153,7 @@
     private int persistenceThreadPriority = Thread.MAX_PRIORITY;
     private boolean useLocalHostBrokerName = false;
     private CountDownLatch stoppedLatch = new CountDownLatch(1);
+    private boolean supportFailOver = false;
 
     static{
         String localHostName = "localhost";
@@ -364,7 +365,7 @@
     /**
      * @return true if this Broker is a slave to a Master
      */
-    public boolean isSlave(){
+    public synchronized boolean isSlave(){
         return masterConnector != null && masterConnector.isSlave();
     }
     
@@ -436,6 +437,7 @@
      
             brokerId = broker.getBrokerId();
             log.info("ActiveMQ JMS Message Broker (" + getBrokerName()+", "+brokerId+") started");
+            getBroker().brokerServiceStarted();
         }
         catch (Exception e) {
             log.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
@@ -486,7 +488,6 @@
         VMTransportFactory.stopped(getBrokerName());
         stopped.set(true);
         stoppedLatch.countDown();
-
         log.info("ActiveMQ JMS Message Broker ("+getBrokerName()+", "+brokerId+") stopped");
         stopper.throwFirstException();
     }
@@ -1104,6 +1105,20 @@
             brokerName=LOCAL_HOST_NAME;
         }
     }
+    
+    /**
+     * @return the supportFailOver
+     */
+    public boolean isSupportFailOver(){
+        return this.supportFailOver;
+    }
+
+    /**
+     * @param supportFailOver the supportFailOver to set
+     */
+    public void setSupportFailOver(boolean supportFailOver){
+        this.supportFailOver=supportFailOver;
+    }    
 
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -1649,6 +1664,7 @@
         }
         if (service instanceof MasterConnector) {
             masterConnector = (MasterConnector) service;
+            supportFailOver=true;
         }
     }
 
@@ -1675,8 +1691,6 @@
                 broker.addDestination(adminConnectionContext, destination);
             }
         }
-    }    
-
-    
+    }
    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Tue Jul  3 01:31:10 2007
@@ -241,5 +241,11 @@
     public URI getVmConnectorURI(){
         return null;
     }
-
+    
+    public void brokerServiceStarted(){
+    }
+    
+    public BrokerService getBrokerService(){
+        return null;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Tue Jul  3 01:31:10 2007
@@ -240,5 +240,12 @@
     public URI getVmConnectorURI(){
         throw new BrokerStoppedException(this.message);
     }
-
+    
+    public void brokerServiceStarted(){
+        throw new BrokerStoppedException(this.message);
+    }
+    
+    public BrokerService getBrokerService(){
+        throw new BrokerStoppedException(this.message);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Tue Jul  3 01:31:10 2007
@@ -254,5 +254,13 @@
     public URI getVmConnectorURI(){
         return getNext().getVmConnectorURI();
     }
+    
+    public void brokerServiceStarted(){
+        getNext().brokerServiceStarted();
+    } 
+    
+    public BrokerService getBrokerService(){
+        return getNext().getBrokerService();
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
Tue Jul  3 01:31:10 2007
@@ -19,6 +19,7 @@
 
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.Message;
@@ -28,6 +29,7 @@
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.transaction.LocalTransaction;
+import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.transaction.Transaction;
 import org.apache.activemq.transaction.XATransaction;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -36,6 +38,7 @@
 import org.apache.commons.logging.LogFactory;
 
 import javax.jms.JMSException;
+
 import javax.transaction.xa.XAException;
 
 import java.util.ArrayList;
@@ -55,6 +58,7 @@
     // The prepared XA transactions.
     private TransactionStore transactionStore;
     private Map xaTransactions = new LinkedHashMap();
+    ActiveMQMessageAudit audit;
 
     public TransactionBroker(Broker next, TransactionStore transactionStore) {
         super(next);
@@ -189,20 +193,41 @@
         }
     }
     
-    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception
{
-        // This method may be invoked recursively.  
+    public void send(ProducerBrokerExchange producerExchange,final Message message) throws
Exception{
+        // This method may be invoked recursively.
         // Track original tx so that it can be restored.
-        final ConnectionContext context = producerExchange.getConnectionContext();
-        Transaction originalTx = context.getTransaction();
+        final ConnectionContext context=producerExchange.getConnectionContext();
+        Transaction originalTx=context.getTransaction();
         Transaction transaction=null;
-        if( message.getTransactionId()!=null ) {
-            transaction = getTransaction(context, message.getTransactionId(), false);
+        Synchronization sync=null;
+        if(message.getTransactionId()!=null){
+            transaction=getTransaction(context,message.getTransactionId(),false);
+            if(transaction!=null){
+                sync=new Synchronization(){
+
+                    public void afterRollback(){
+                        if(audit!=null){
+                            audit.rollbackMessageReference(message);
+                        }
+                    }
+                };
+                transaction.addSynchronization(sync);
+            }
         }
-        context.setTransaction(transaction);
-        try {
-            next.send(producerExchange, message);
-        } finally {
-            context.setTransaction(originalTx);
+        if(audit==null||!audit.isDuplicateMessageReference(message)){
+            context.setTransaction(transaction);
+            try{
+                next.send(producerExchange,message);
+            }finally{
+                context.setTransaction(originalTx);
+            }
+        }else{
+            if(sync!=null&&transaction!=null){
+                transaction.removeSynchronization(sync);
+            }
+            if(log.isDebugEnabled()){
+                log.debug("IGNORING duplicate message "+message);
+            }
         }
     }
     
@@ -248,5 +273,12 @@
             xaTransactions.remove(xid);
         }
     }
+    
+    public synchronized void brokerServiceStarted(){
+        super.brokerServiceStarted();
+        if(getBrokerService().isSupportFailOver()&&audit==null){
+            audit=new ActiveMQMessageAudit();
+        }
+    } 
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Tue Jul  3 01:31:10 2007
@@ -479,24 +479,7 @@
     public Response processMessage(Message messageSend) throws Exception{
         ProducerId producerId=messageSend.getProducerId();
         ProducerBrokerExchange producerExchange=getProducerBrokerExchange(producerId);
-        ProducerState producerState = null;
-        if(messageSend.getMessageId().getProducerId().equals(messageSend.getProducerId())){
-            producerState=producerExchange.getProducerState();
-        }
-        if(producerState!=null){
-            long seq=messageSend.getMessageId().getProducerSequenceId();
-            if(seq>producerState.getLastSequenceId()){
-                producerState.setLastSequenceId(seq);
-                broker.send(producerExchange,messageSend);
-            }else {
-                if (log.isDebugEnabled()) {
-                    log.debug("Discarding duplicate: " + messageSend);
-                }
-            }
-        }else{
-            // producer not local to this broker
-            broker.send(producerExchange,messageSend);
-        }
+        broker.send(producerExchange,messageSend);
         return null;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Tue Jul  3 01:31:10 2007
@@ -132,6 +132,7 @@
         this.broker = broker;
         brokerInfo.setBrokerId(broker.getBrokerId());
         brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
+        brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
     }
 	
     public void setBrokerName(String brokerName) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
Tue Jul  3 01:31:10 2007
@@ -288,9 +288,11 @@
         mdn.setConsumerId(messageDispatch.getConsumerId());
         mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
         mdn.setDestination(messageDispatch.getDestination());
-        if(messageDispatch.getMessage()!=null)
-            mdn.setMessageId(messageDispatch.getMessage().getMessageId());
-        sendAsyncToSlave(mdn);
+        if(messageDispatch.getMessage()!=null){
+            Message msg=messageDispatch.getMessage();
+            mdn.setMessageId(msg.getMessageId());
+            sendAsyncToSlave(mdn);
+        }
         super.processDispatch(messageDispatch);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
Tue Jul  3 01:31:10 2007
@@ -55,146 +55,137 @@
  * 
  * @version $Revision$
  */
-public class MasterConnector implements Service, BrokerServiceAware {
+public class MasterConnector implements Service,BrokerServiceAware{
 
-    private static final Log log = LogFactory.getLog(MasterConnector.class);
+    private static final Log log=LogFactory.getLog(MasterConnector.class);
     private BrokerService broker;
     private URI remoteURI;
     private URI localURI;
     private Transport localBroker;
     private Transport remoteBroker;
     private TransportConnector connector;
-    private AtomicBoolean masterActive = new AtomicBoolean(false);
-    private AtomicBoolean started = new AtomicBoolean(false);
-    private final IdGenerator idGenerator = new IdGenerator();
+    private AtomicBoolean started=new AtomicBoolean(false);
+    private final IdGenerator idGenerator=new IdGenerator();
     private String userName;
     private String password;
     private ConnectionInfo connectionInfo;
     private SessionInfo sessionInfo;
     private ProducerInfo producerInfo;
+    final AtomicBoolean masterActive = new AtomicBoolean();
 
-    public MasterConnector() {
+    public MasterConnector(){
     }
 
-    public MasterConnector(String remoteUri) throws URISyntaxException {
-        remoteURI = new URI(remoteUri);
+    public MasterConnector(String remoteUri) throws URISyntaxException{
+        remoteURI=new URI(remoteUri);
     }
 
-    public void setBrokerService(BrokerService broker) {
-        this.broker = broker;
-        if (localURI == null) {
-            localURI = broker.getVmConnectorURI();
+    public void setBrokerService(BrokerService broker){
+        this.broker=broker;
+        if(localURI==null){
+            localURI=broker.getVmConnectorURI();
         }
-        if (connector == null) {
-            List transportConnectors = broker.getTransportConnectors();
-            if (!transportConnectors.isEmpty()) {
-                connector = (TransportConnector) transportConnectors.get(0);
+        if(connector==null){
+            List transportConnectors=broker.getTransportConnectors();
+            if(!transportConnectors.isEmpty()){
+                connector=(TransportConnector)transportConnectors.get(0);
             }
         }
     }
 
-    public boolean isSlave() {
+    public boolean isSlave(){
         return masterActive.get();
     }
 
-    public void start() throws Exception {
-        if (!started.compareAndSet(false, true)) {
+    public void start() throws Exception{
+        if(!started.compareAndSet(false,true)){
             return;
         }
-        if (remoteURI == null) {
+        if(remoteURI==null){
             throw new IllegalArgumentException("You must specify a remoteURI");
         }
-        localBroker = TransportFactory.connect(localURI);
-        remoteBroker = TransportFactory.connect(remoteURI);
-        log.info("Starting a network connection between " + localBroker + " and " + remoteBroker
+ " has been established.");
+        localBroker=TransportFactory.connect(localURI);
+        remoteBroker=TransportFactory.connect(remoteURI);
+        log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+"
has been established.");
+        localBroker.setTransportListener(new DefaultTransportListener(){
 
-        localBroker.setTransportListener(new DefaultTransportListener() {
-            public void onCommand(Object command) {
+            public void onCommand(Object command){
             }
 
-            public void onException(IOException error) {
-                if (started.get()) {
+            public void onException(IOException error){
+                if(started.get()){
                     serviceLocalException(error);
                 }
             }
         });
+        remoteBroker.setTransportListener(new DefaultTransportListener(){
 
-        remoteBroker.setTransportListener(new DefaultTransportListener() {
-            public void onCommand(Object o) {
-            	Command command = (Command) o;
-                if (started.get()) {
+            public void onCommand(Object o){
+                Command command=(Command)o;
+                if(started.get()){
                     serviceRemoteCommand(command);
                 }
             }
 
-            public void onException(IOException error) {
-                if (started.get()) {
+            public void onException(IOException error){
+                if(started.get()){
                     serviceRemoteException(error);
                 }
             }
         });
-
         masterActive.set(true);
-        Thread thead = new Thread() {
-            public void run() {
-                try {
+        Thread thead=new Thread(){
+
+            public void run(){
+                try{
                     localBroker.start();
                     remoteBroker.start();
                     startBridge();
-                }
-                catch (Exception e) {
+                }catch(Exception e){
                     masterActive.set(false);
-                    log.error("Failed to start network bridge: " + e, e);
+                    log.error("Failed to start network bridge: "+e,e);
                 }
             }
         };
         thead.start();
-
     }
 
-    protected void startBridge() throws Exception {
-        connectionInfo = new ConnectionInfo();
+    protected void startBridge() throws Exception{
+        connectionInfo=new ConnectionInfo();
         connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
         connectionInfo.setClientId(idGenerator.generateId());
         connectionInfo.setUserName(userName);
         connectionInfo.setPassword(password);
         localBroker.oneway(connectionInfo);
-        ConnectionInfo remoteInfo = new ConnectionInfo();
+        ConnectionInfo remoteInfo=new ConnectionInfo();
         connectionInfo.copy(remoteInfo);
         remoteInfo.setBrokerMasterConnector(true);
         remoteBroker.oneway(connectionInfo);
-
-        sessionInfo = new SessionInfo(connectionInfo, 1);
+        sessionInfo=new SessionInfo(connectionInfo,1);
         localBroker.oneway(sessionInfo);
         remoteBroker.oneway(sessionInfo);
-
-        producerInfo = new ProducerInfo(sessionInfo, 1);
+        producerInfo=new ProducerInfo(sessionInfo,1);
         producerInfo.setResponseRequired(false);
         remoteBroker.oneway(producerInfo);
-
-        BrokerInfo brokerInfo = null;
-        if (connector != null) {
-
-            brokerInfo = connector.getBrokerInfo();
-        }
-        else {
-            brokerInfo = new BrokerInfo();
+        BrokerInfo brokerInfo=null;
+        if(connector!=null){
+            brokerInfo=connector.getBrokerInfo();
+        }else{
+            brokerInfo=new BrokerInfo();
         }
         brokerInfo.setBrokerName(broker.getBrokerName());
         brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
         brokerInfo.setSlaveBroker(true);
         remoteBroker.oneway(brokerInfo);
-
-        log.info("Slave connection between " + localBroker + " and " + remoteBroker + " has
been established.");
+        log.info("Slave connection between "+localBroker+" and "+remoteBroker+" has been
established.");
     }
 
-    public void stop() throws Exception {
-        if (!started.compareAndSet(true, false)) {
+    public void stop() throws Exception{
+        if(!started.compareAndSet(true,false)){
             return;
         }
-
         masterActive.set(false);
-        try {
+        try{
             // if (connectionInfo!=null){
             // localBroker.request(connectionInfo.createRemoveCommand());
             // }
@@ -202,59 +193,56 @@
             // remoteBroker.setTransportListener(null);
             remoteBroker.oneway(new ShutdownInfo());
             localBroker.oneway(new ShutdownInfo());
-        }
-        catch (IOException e) {
-            log.debug("Caught exception stopping", e);
-        }
-        finally {
-            ServiceStopper ss = new ServiceStopper();
+        }catch(IOException e){
+            log.debug("Caught exception stopping",e);
+        }finally{
+            ServiceStopper ss=new ServiceStopper();
             ss.stop(localBroker);
             ss.stop(remoteBroker);
             ss.throwFirstException();
         }
     }
 
-    protected void serviceRemoteException(IOException error) {
-        log.error("Network connection between " + localBroker + " and " + remoteBroker +
" shutdown: " + error.getMessage(), error);
+    protected void serviceRemoteException(IOException error){
+        log
+                .error("Network connection between "+localBroker+" and "+remoteBroker+" shutdown:
"+error.getMessage(),
+                        error);
         shutDown();
     }
 
-    protected void serviceRemoteCommand(Command command) {
-        try {
-            if (command.isMessageDispatch()) {
-                MessageDispatch md = (MessageDispatch) command;
-                command = md.getMessage();
+    protected void serviceRemoteCommand(Command command){
+        try{
+            if(command.isMessageDispatch()){
+                MessageDispatch md=(MessageDispatch)command;
+                command=md.getMessage();
             }
-            if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) {
+            if(command.getDataStructureType()==CommandTypes.SHUTDOWN_INFO){
                 log.warn("The Master has shutdown");
                 shutDown();
-
-            }
-            else {
-                boolean responseRequired = command.isResponseRequired();
-                int commandId = command.getCommandId();
+            }else{
+                boolean responseRequired=command.isResponseRequired();
+                int commandId=command.getCommandId();
                 localBroker.oneway(command);
-                if (responseRequired) {
-                    Response response = new Response();
+                if(responseRequired){
+                    Response response=new Response();
                     response.setCorrelationId(commandId);
                     remoteBroker.oneway(response);
                 }
             }
-        }
-        catch (IOException e) {
+        }catch(IOException e){
             serviceRemoteException(e);
         }
     }
 
-    protected void serviceLocalException(Throwable error) {
-        log.info("Network connection between " + localBroker + " and " + remoteBroker + "
shutdown: " + error.getMessage(), error);
+    protected void serviceLocalException(Throwable error){
+        log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown:
"+error.getMessage(),error);
         ServiceSupport.dispose(this);
     }
 
     /**
      * @return Returns the localURI.
      */
-    public URI getLocalURI() {
+    public URI getLocalURI(){
         return localURI;
     }
 
@@ -262,14 +250,14 @@
      * @param localURI
      *            The localURI to set.
      */
-    public void setLocalURI(URI localURI) {
-        this.localURI = localURI;
+    public void setLocalURI(URI localURI){
+        this.localURI=localURI;
     }
 
     /**
      * @return Returns the remoteURI.
      */
-    public URI getRemoteURI() {
+    public URI getRemoteURI(){
         return remoteURI;
     }
 
@@ -277,14 +265,14 @@
      * @param remoteURI
      *            The remoteURI to set.
      */
-    public void setRemoteURI(URI remoteURI) {
-        this.remoteURI = remoteURI;
+    public void setRemoteURI(URI remoteURI){
+        this.remoteURI=remoteURI;
     }
 
     /**
      * @return Returns the password.
      */
-    public String getPassword() {
+    public String getPassword(){
         return password;
     }
 
@@ -292,14 +280,14 @@
      * @param password
      *            The password to set.
      */
-    public void setPassword(String password) {
-        this.password = password;
+    public void setPassword(String password){
+        this.password=password;
     }
 
     /**
      * @return Returns the userName.
      */
-    public String getUserName() {
+    public String getUserName(){
         return userName;
     }
 
@@ -307,14 +295,13 @@
      * @param userName
      *            The userName to set.
      */
-    public void setUserName(String userName) {
-        this.userName = userName;
+    public void setUserName(String userName){
+        this.userName=userName;
     }
 
-    private void shutDown() {
+    private void shutDown(){
         masterActive.set(false);
         broker.masterFailed();
         ServiceSupport.dispose(this);
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Jul  3 01:31:10 2007
@@ -17,7 +17,6 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import org.apache.activemq.broker.Broker;
@@ -55,7 +54,7 @@
     protected long enqueueCounter;
     protected long dispatchCounter;
     protected long dequeueCounter;
-    private AtomicBoolean dispatching=new AtomicBoolean();
+    
 
     public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,PendingMessageCursor
cursor)
             throws InvalidSelectorException{
@@ -207,7 +206,9 @@
             }
             // this only happens after a reconnect - get an ack which is not valid
             if(!callDispatchMatched){
-                log.info("Could not correlate acknowledgment with dispatched message: "+ack);
+                if (log.isDebugEnabled()) {
+                    log.debug("Could not correlate acknowledgment with dispatched message:
"+ack);
+                }
             }
         }else if(ack.isDeliveredAck()){
             // Message was delivered but not acknowledged: update pre-fetch counters.
@@ -376,35 +377,31 @@
     }
 
     protected synchronized void dispatchMatched() throws IOException{
-        if(!broker.isSlaveBroker()&&dispatching.compareAndSet(false,true)){
+        if(!broker.isSlaveBroker()){
             try{
-                try{
-                    int numberToDispatch=countBeforeFull();
-                    if(numberToDispatch>0){
-                        pending.setMaxBatchSize(numberToDispatch);
-                        int count=0;
-                        pending.reset();
-                        while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
-                            MessageReference node=pending.next();
-                            if(node==null)
-                                break;
-                            if(canDispatch(node)){
-                                pending.remove();
-                                // Message may have been sitting in the pending list a while
-                                // waiting for the consumer to ak the message.
-                                if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
-                                    continue; // just drop it.
-                                }
-                                dispatch(node);
-                                count++;
+                int numberToDispatch=countBeforeFull();
+                if(numberToDispatch>0){
+                    pending.setMaxBatchSize(numberToDispatch);
+                    int count=0;
+                    pending.reset();
+                    while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
+                        MessageReference node=pending.next();
+                        if(node==null)
+                            break;
+                        if(canDispatch(node)){
+                            pending.remove();
+                            // Message may have been sitting in the pending list a while
+                            // waiting for the consumer to ak the message.
+                            if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
+                                continue; // just drop it.
                             }
+                            dispatch(node);
+                            count++;
                         }
                     }
-                }finally{
-                    pending.release();
                 }
             }finally{
-                dispatching.set(false);
+                pending.release();
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Tue Jul  3 01:31:10 2007
@@ -28,6 +28,7 @@
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
@@ -35,6 +36,7 @@
 import org.apache.activemq.broker.ConsumerBrokerExchange;
 import org.apache.activemq.broker.DestinationAlreadyExistsException;
 import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransactionBroker;
 import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
@@ -63,6 +65,8 @@
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.util.ServiceStopper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -74,7 +78,7 @@
  * @version $Revision$
  */
 public class RegionBroker implements Broker {
-
+    private static final Log log = LogFactory.getLog(RegionBroker.class);
     private static final IdGenerator brokerIdGenerator = new IdGenerator();
 
     private final Region queueRegion;
@@ -99,6 +103,7 @@
     private ConnectionContext adminConnectionContext;
     protected DestinationFactory destinationFactory;
     protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
+   
     
         
     public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory,
UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor
destinationInterceptor) throws IOException {
@@ -378,26 +383,26 @@
         topicRegion.removeSubscription(context, info);
     }
 
-    public void send(ProducerBrokerExchange producerExchange,  Message message) throws Exception
{
-        long si = sequenceGenerator.getNextSequenceId();
+    public void send(ProducerBrokerExchange producerExchange,Message message) throws Exception{
+        long si=sequenceGenerator.getNextSequenceId();
         message.getMessageId().setBrokerSequenceId(si);
-        if (producerExchange.isMutable() || producerExchange.getRegion()==null) {
-            ActiveMQDestination destination = message.getDestination();
-            //ensure the destination is registered with the RegionBroker
+        if(producerExchange.isMutable()||producerExchange.getRegion()==null){
+            ActiveMQDestination destination=message.getDestination();
+            // ensure the destination is registered with the RegionBroker
             addDestination(producerExchange.getConnectionContext(),destination);
-            Region region = null;
-            switch(destination.getDestinationType()) {
+            Region region=null;
+            switch(destination.getDestinationType()){
             case ActiveMQDestination.QUEUE_TYPE:
-                region = queueRegion;
+                region=queueRegion;
                 break;
             case ActiveMQDestination.TOPIC_TYPE:
-                region = topicRegion;
+                region=topicRegion;
                 break;
             case ActiveMQDestination.TEMP_QUEUE_TYPE:
-                region = tempQueueRegion;
+                region=tempQueueRegion;
                 break;
             case ActiveMQDestination.TEMP_TOPIC_TYPE:
-                region = tempTopicRegion;
+                region=tempTopicRegion;
                 break;
             default:
                 throw createUnknownDestinationTypeException(destination);
@@ -613,4 +618,13 @@
     public URI getVmConnectorURI(){
         return brokerService.getVmConnectorURI();
     }
+
+    public void brokerServiceStarted(){
+    }
+
+    public BrokerService getBrokerService(){
+        return brokerService;
+    }
+    
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
Tue Jul  3 01:31:10 2007
@@ -33,10 +33,5 @@
     public ProducerInfo getInfo() {
         return info;
     }
-	public void setLastSequenceId(long lastSequenceId) {
-		this.lastSequenceId = lastSequenceId;		
-	}
-	public long getLastSequenceId() {
-		return lastSequenceId;
-	}        
+	       
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
Tue Jul  3 01:31:10 2007
@@ -55,6 +55,10 @@
             state = IN_USE_STATE;
         }
     }
+    
+    public void removeSynchronization(Synchronization r) {
+        synchronizations.remove(r);
+    }
 
     public void prePrepare() throws Exception {
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=552738&r1=552737&r2=552738
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
Tue Jul  3 01:31:10 2007
@@ -236,4 +236,12 @@
     public URI getVmConnectorURI(){
         return null;
     }
+
+    public void brokerServiceStarted(){        
+    }
+
+    
+    public BrokerService getBrokerService(){
+        return null;
+    }
 }



Mime
View raw message