activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r834922 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/journal/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ kahadb/src/main/java/org/apache/kahadb/journal/
Date Wed, 11 Nov 2009 15:50:46 GMT
Author: dejanb
Date: Wed Nov 11 15:50:45 2009
New Revision: 834922

URL: http://svn.apache.org/viewvc?rev=834922&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-2042 - shutdown if kahadb cannot access disk

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=834922&r1=834921&r2=834922&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
Wed Nov 11 15:50:45 2009
@@ -732,7 +732,7 @@
         	   try {
     	            brokerService.stop();
     	        } catch (Exception e) {
-    	            LOG.warn("Failure occured while stopping broker");
+    	            LOG.warn("Failure occured while stopping broker", e);
     	        }    			
     		}
     	}.start();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=834922&r1=834921&r2=834922&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
Wed Nov 11 15:50:45 2009
@@ -17,6 +17,8 @@
 package org.apache.activemq.store.kahadb;
 
 import org.apache.activeio.journal.Journal;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -37,7 +39,7 @@
  * @org.apache.xbean.XBean element="kahaDB"
  * @version $Revision: 1.17 $
  */
-public class KahaDBPersistenceAdapter implements PersistenceAdapter {
+public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
     private KahaDBStore letter = new KahaDBStore();
     
 
@@ -364,4 +366,8 @@
     public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
         letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
     }
+
+	public void setBrokerService(BrokerService brokerService) {
+		letter.setBrokerService(brokerService);
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=834922&r1=834921&r2=834922&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Wed Nov 11 15:50:45 2009
@@ -36,6 +36,8 @@
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -75,8 +77,11 @@
 import org.apache.kahadb.util.SequenceSet;
 import org.apache.kahadb.util.StringMarshaller;
 import org.apache.kahadb.util.VariableMarshaller;
+import org.springframework.core.enums.LetterCodedLabeledEnum;
 
-public class MessageDatabase {
+public class MessageDatabase implements BrokerServiceAware {
+	
+	private BrokerService brokerService;
 
     public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
     public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME,
"500"));
@@ -259,6 +264,9 @@
 	                    }
 	                } catch (InterruptedException e) {
 	                    // Looks like someone really wants us to exit this thread...
+	                } catch (IOException ioe) {
+	                	LOG.error("Checkpoint failed", ioe);
+	                	stopBroker();
 	                }
 	            }
 	        };
@@ -575,26 +583,22 @@
         return journal.getNextLocation(null);
 	}
 
-    protected void checkpointCleanup(final boolean cleanup) {
-        try {
-        	long start = System.currentTimeMillis();
-            synchronized (indexMutex) {
-            	if( !opened.get() ) {
-            		return;
-            	}
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    public void execute(Transaction tx) throws IOException {
-                        checkpointUpdate(tx, cleanup);
-                    }
-                });
-            }
-        	long end = System.currentTimeMillis();
-        	if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-        		LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+    protected void checkpointCleanup(final boolean cleanup) throws IOException {
+    	long start = System.currentTimeMillis();
+        synchronized (indexMutex) {
+        	if( !opened.get() ) {
+        		return;
         	}
-        } catch (IOException e) {
-        	e.printStackTrace();
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    checkpointUpdate(tx, cleanup);
+                }
+            });
         }
+    	long end = System.currentTimeMillis();
+    	if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+    		LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+    	}
     }
 
     
@@ -623,7 +627,6 @@
      * durring a recovery process.
      */
     public Location store(JournalCommand data, boolean sync) throws IOException {
-
     	
         int size = data.serializedSizeFramed();
         DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@@ -1530,4 +1533,20 @@
     public void setChecksumJournalFiles(boolean checksumJournalFiles) {
         this.checksumJournalFiles = checksumJournalFiles;
     }
+
+	public void setBrokerService(BrokerService brokerService) {
+		this.brokerService = brokerService;
+	}
+	
+    protected void stopBroker() {
+        new Thread() {
+           public void run() {
+        	   try {
+    	            brokerService.stop();
+    	        } catch (Exception e) {
+    	            LOG.warn("Failure occured while stopping broker", e);
+    	        }    			
+    		}
+    	}.start();
+    }
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=834922&r1=834921&r2=834922&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Wed
Nov 11 15:50:45 2009
@@ -158,7 +158,7 @@
      * @throws
      */
     public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException
{
-
+    	
         // Write the packet our internal buffer.
         int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
 
@@ -298,6 +298,7 @@
     protected void processQueue() {
         DataFile dataFile = null;
         RandomAccessFile file = null;
+        WriteBatch wb = null;
         try {
 
             DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
@@ -321,7 +322,7 @@
                     enqueueMutex.notify();
                 }
 
-                WriteBatch wb = (WriteBatch)o;
+                wb = (WriteBatch)o;
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
                         file.setLength(dataFile.getLength());
@@ -403,6 +404,9 @@
                 wb.latch.countDown();
             }
         } catch (IOException e) {
+        	if (wb != null) {
+        		wb.latch.countDown();
+        	}
             synchronized (enqueueMutex) {
                 firstAsyncException = e;
             }



Mime
View raw message