activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r835373 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor: KahaPersistenceAdapter.java KahaTransactionStore.java
Date Thu, 12 Nov 2009 12:56:29 GMT
Author: dejanb
Date: Thu Nov 12 12:56:29 2009
New Revision: 835373

URL: http://svn.apache.org/viewvc?rev=835373&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-2042 - fix for kaha persistence store

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?rev=835373&r1=835372&r2=835373&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
Thu Nov 12 12:56:29 2009
@@ -24,6 +24,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -54,7 +56,7 @@
  * @org.apache.xbean.XBean
  * @version $Revision: 1.4 $
  */
-public class KahaPersistenceAdapter implements PersistenceAdapter {
+public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
 
     private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
     private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
@@ -73,6 +75,7 @@
     private boolean initialized;
     private final AtomicLong storeSize;
     private boolean persistentIndex = true;
+    private BrokerService brokerService;
 
     
     public KahaPersistenceAdapter(AtomicLong size) {
@@ -175,6 +178,7 @@
                     container.setValueMarshaller(new TransactionMarshaller(wireFormat));
                     container.load();
                     transactionStore = new KahaTransactionStore(this, container);
+                    transactionStore.setBrokerService(brokerService);
                     break;
                 } catch (StoreLockedExcpetion e) {
                     LOG.info("Store is locked... waiting " + (STORE_LOCKED_WAIT_DELAY / 1000)
@@ -361,6 +365,10 @@
             wireFormat.setTightEncodingEnabled(true);
         }
     }
+
+	public void setBrokerService(BrokerService brokerService) {
+		this.brokerService = brokerService;
+	}
   
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=835373&r1=835372&r2=835373&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
Thu Nov 12 12:56:29 2009
@@ -24,17 +24,23 @@
 
 import javax.transaction.xa.XAException;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.kaha.RuntimeStoreException;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.ProxyMessageStore;
 import org.apache.activemq.store.ProxyTopicMessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.journal.JournalPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Provides a TransactionStore implementation that can create transaction aware
@@ -42,10 +48,14 @@
  * 
  * @version $Revision: 1.4 $
  */
-public class KahaTransactionStore implements TransactionStore {
+public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {	
+    private static final Log LOG = LogFactory.getLog(KahaTransactionStore.class);
+	
     private Map transactions = new ConcurrentHashMap();
     private Map prepared;
     private KahaPersistenceAdapter adaptor;
+    
+    private BrokerService brokerService;
 
     KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
         this.adaptor = adaptor;
@@ -130,12 +140,17 @@
      * @throws IOException
      */
     void addMessage(final MessageStore destination, final Message message) throws IOException
{
-        if (message.isInTransaction()) {
-            KahaTransaction tx = getOrCreateTx(message.getTransactionId());
-            tx.add((KahaMessageStore)destination, message);
-        } else {
-            destination.addMessage(null, message);
-        }
+    	try {
+    		if (message.isInTransaction()) {
+    			KahaTransaction tx = getOrCreateTx(message.getTransactionId());
+    			tx.add((KahaMessageStore)destination, message);
+    		} else {
+    			destination.addMessage(null, message);
+    		}
+    	} catch (RuntimeStoreException rse) {
+    	    stopBroker();
+    	    throw rse;
+    	}
     }
 
     /**
@@ -143,12 +158,17 @@
      * @throws IOException
      */
     final void removeMessage(final MessageStore destination, final MessageAck ack) throws
IOException {
-        if (ack.isInTransaction()) {
-            KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
-            tx.add((KahaMessageStore)destination, ack);
-        } else {
-            destination.removeMessage(null, ack);
-        }
+    	try {
+    		if (ack.isInTransaction()) {
+    			KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
+    			tx.add((KahaMessageStore)destination, ack);
+    		} else {
+    			destination.removeMessage(null, ack);
+    		}
+    	} catch (RuntimeStoreException rse) {
+    	    stopBroker();
+    	    throw rse;
+    	}
     }
 
     protected synchronized KahaTransaction getTx(TransactionId key) {
@@ -181,4 +201,20 @@
     protected MessageStore getStoreById(Object id) {
         return adaptor.retrieveMessageStore(id);
     }
+
+	public void setBrokerService(BrokerService brokerService) {
+		this.brokerService = brokerService;
+	}
+	
+    protected void stopBroker() {
+        new Thread() {
+           public void run() {
+        	   try {
+    	            brokerService.stop();
+    	        } catch (Exception e) {
+    	            LOG.warn("Failure occured while stopping broker", e);
+    	        }    			
+    		}
+    	}.start();
+    }
 }



Mime
View raw message