activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r480924 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ broker/region/cursors/ store/jdbc/ store/journal/ store/kahadaptor/ store/memory/
Date Thu, 30 Nov 2006 13:46:10 GMT
Author: rajdavies
Date: Thu Nov 30 05:46:08 2006
New Revision: 480924

URL: http://svn.apache.org/viewvc?view=rev&rev=480924
Log:
log which Persistence Adaptor we are using

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=480924&r1=480923&r2=480924
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Thu Nov 30 05:46:08 2006
@@ -385,7 +385,7 @@
             startDestinations();
             
             addShutdownHook();
-            log.info("Using Persistence Adaptor " + getPersistenceAdapter());
+            log.info("Using Persistence Adaptor: " + getPersistenceAdapter());
             if (deleteAllMessagesOnStartup) {
                 deleteAllMessages();
             }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=diff&rev=480924&r1=480923&r2=480924
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Thu Nov 30 05:46:08 2006
@@ -59,7 +59,7 @@
         }
         nonPersistent.start();
         persistent.start();
-        pendingCount=persistent.size();
+        pendingCount=persistent.size() + nonPersistent.size();
     }
 
     public synchronized void stop() throws Exception{
@@ -87,12 +87,28 @@
             }
         }
     }
+    
+    public void addMessageFirst(MessageReference node) throws Exception{
+        if(node!=null){
+            Message msg=node.getMessage();
+            if(started){
+                pendingCount++;
+                if(!msg.isPersistent()){
+                    nonPersistent.addMessageFirst(node);
+                }
+            }
+            if(msg.isPersistent()){
+                persistent.addMessageFirst(node);
+            }
+        }
+    }
 
     public void clear(){
         pendingCount=0;
     }
 
     public synchronized boolean hasNext(){
+        
         boolean result=pendingCount>0;
         if(result){
             try{
@@ -107,7 +123,8 @@
     }
 
     public synchronized MessageReference next(){
-        return currentCursor!=null?currentCursor.next():null;
+        MessageReference result = currentCursor!=null?currentCursor.next():null;
+        return result;
     }
 
     public synchronized void remove(){
@@ -118,6 +135,11 @@
     }
 
     public void remove(MessageReference node){
+        if (!node.isPersistent()) {
+            nonPersistent.remove(node);
+        }else {
+            persistent.remove(node);
+        }
         pendingCount--;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java?view=diff&rev=480924&r1=480923&r2=480924
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java
Thu Nov 30 05:46:08 2006
@@ -88,5 +88,9 @@
         ds.setCreateDatabase("create");
         return ds;
     }
+    
+    public String toString(){
+        return ""+dataSource;
+    }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?view=diff&rev=480924&r1=480923&r2=480924
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Thu Nov 30 05:46:08 2006
@@ -482,4 +482,8 @@
     protected DatabaseLocker createDatabaseLocker() throws IOException {
         return new DefaultDatabaseLocker(getDataSource(), getStatements());
     }
+    
+    public String toString(){
+        return "JDBCPersistenceAdaptor("+super.toString()+")";
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=480924&r1=480923&r2=480924
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
Thu Nov 30 05:46:08 2006
@@ -669,5 +669,9 @@
     	org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
     	return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
     }
+    
+    public String toString(){
+        return "JournalPersistenceAdapator(" + longTermPersistence + ")";
+    }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=480924&r1=480923&r2=480924
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
Thu Nov 30 05:46:08 2006
@@ -270,4 +270,8 @@
         String name=dir.getAbsolutePath()+File.separator+"kahadb";
         return name;
     }
+    
+    public String toString(){
+        return "KahaPersistenceAdapter(" + getStoreName() +")";
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?view=diff&rev=480924&r1=480923&r2=480924
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
Thu Nov 30 05:46:08 2006
@@ -50,12 +50,16 @@
     }
 
     public synchronized void addMessage(ConnectionContext context,Message message) throws
IOException{
-        messageTable.put(message.getMessageId(),message);
+        synchronized(messageTable){
+            messageTable.put(message.getMessageId(),message);
+        }
     }
 
     public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String
messageRef)
             throws IOException{
-        messageTable.put(messageId,messageRef);
+        synchronized(messageTable){
+            messageTable.put(messageId,messageRef);
+        }
     }
 
     public Message getMessage(MessageId identity) throws IOException{
@@ -67,11 +71,16 @@
     }
 
     public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
-        messageTable.remove(ack.getLastMessageId());
+        removeMessage(ack.getLastMessageId());
     }
 
     public void removeMessage(MessageId msgId) throws IOException{
-        messageTable.remove(msgId);
+        synchronized(messageTable){
+            messageTable.remove(msgId);
+            if(lastBatchId!=null&lastBatchId.equals(msgId)){
+                lastBatchId=null;
+            }
+        }
     }
 
     public void recover(MessageRecoveryListener listener) throws Exception{
@@ -96,7 +105,9 @@
     }
 
     public void removeAllMessages(ConnectionContext context) throws IOException{
-        messageTable.clear();
+        synchronized(messageTable){
+            messageTable.clear();
+        }
     }
 
     public ActiveMQDestination getDestination(){
@@ -104,7 +115,9 @@
     }
 
     public void delete(){
-        messageTable.clear();
+        synchronized(messageTable){
+            messageTable.clear();
+        }
     }
 
     /**
@@ -117,18 +130,16 @@
         return messageTable.size();
     }
 
-    
     public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws
Exception{
         synchronized(messageTable){
-            
             boolean pastLackBatch=lastBatchId==null;
-            int count = 0;
+            int count=0;
             for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
                 Map.Entry entry=(Entry)iter.next();
                 if(pastLackBatch){
                     count++;
                     Object msg=entry.getValue();
-                    lastBatchId = (MessageId)entry.getKey();
+                    lastBatchId=(MessageId)entry.getKey();
                     if(msg.getClass()==String.class){
                         listener.recoverMessageReference((String)msg);
                     }else{
@@ -143,6 +154,6 @@
     }
 
     public void resetBatching(){
-        lastBatchId = null;
+        lastBatchId=null;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?view=diff&rev=480924&r1=480923&r2=480924
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
Thu Nov 30 05:46:08 2006
@@ -154,4 +154,8 @@
      */
     public void setUsageManager(UsageManager usageManager) {
     }
+    
+    public String toString(){
+        return "MemoryPersistenceAdapter";
+    }
 }



Mime
View raw message