activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r504032 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor: KahaMessageStore.java KahaReferenceStore.java
Date Tue, 06 Feb 2007 08:40:15 GMT
Author: rajdavies
Date: Tue Feb  6 00:40:14 2007
New Revision: 504032

URL: http://svn.apache.org/viewvc?view=rev&rev=504032
Log:
make sure next entry will be set properly in recovery

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=504032&r1=504031&r2=504032
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
Tue Feb  6 00:40:14 2007
@@ -36,9 +36,9 @@
     protected final ActiveMQDestination destination;
     protected final MapContainer<MessageId,Message> messageContainer;
     protected StoreEntry batchEntry=null;
-    
 
-    public KahaMessageStore(MapContainer<MessageId,Message> container,ActiveMQDestination
destination) throws IOException{
+    public KahaMessageStore(MapContainer<MessageId,Message> container,ActiveMQDestination
destination)
+            throws IOException{
         this.messageContainer=container;
         this.destination=destination;
     }
@@ -110,7 +110,7 @@
     /**
      * @param usageManager The UsageManager that is controlling the destination's memory
usage.
      */
-    public void setUsageManager(UsageManager usageManager) {
+    public void setUsageManager(UsageManager usageManager){
     }
 
     /**
@@ -146,6 +146,9 @@
         }else{
             entry=messageContainer.refresh(entry);
             entry=messageContainer.getNext(entry);
+            if(entry==null){
+                batchEntry=null;
+            }
         }
         if(entry!=null){
             int count=0;
@@ -176,6 +179,4 @@
     public boolean isSupportForCursors(){
         return true;
     }
-
-    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=504032&r1=504031&r2=504032
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Tue Feb  6 00:40:14 2007
@@ -1,20 +1,17 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.store.kahadaptor;
 
 import java.io.IOException;
@@ -30,46 +27,40 @@
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.ReferenceStore;
 
-public class KahaReferenceStore implements ReferenceStore {
+public class KahaReferenceStore implements ReferenceStore{
 
     protected final ActiveMQDestination destination;
     protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
     protected StoreEntry batchEntry=null;
-	
 
-	public KahaReferenceStore(MapContainer container, ActiveMQDestination destination) throws
IOException {
+    public KahaReferenceStore(MapContainer container,ActiveMQDestination destination) throws
IOException{
         this.messageContainer=container;
         this.destination=destination;
-		
-	}
-    
+    }
+
     public void start(){
     }
 
     public void stop(){
     }
 
-	
-	protected MessageId getMessageId(Object object) {
-		return new MessageId(((ReferenceRecord)object).messageId);
-	}
-
-	
-	public synchronized void addMessage(ConnectionContext context, Message message) throws IOException
{
-		throw new RuntimeException("Use addMessageReference instead");
-	}
-	
-	
-	public synchronized Message getMessage(MessageId identity) throws IOException {
-		throw new RuntimeException("Use addMessageReference instead");
-	}
-	
-	
-	protected void recover(MessageRecoveryListener listener, Object msg) throws Exception {
-		ReferenceRecord record = (ReferenceRecord) msg;
-		listener.recoverMessageReference(new MessageId(record.messageId));
-	}
-    
+    protected MessageId getMessageId(Object object){
+        return new MessageId(((ReferenceRecord)object).messageId);
+    }
+
+    public synchronized void addMessage(ConnectionContext context,Message message) throws
IOException{
+        throw new RuntimeException("Use addMessageReference instead");
+    }
+
+    public synchronized Message getMessage(MessageId identity) throws IOException{
+        throw new RuntimeException("Use addMessageReference instead");
+    }
+
+    protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
+        ReferenceRecord record=(ReferenceRecord)msg;
+        listener.recoverMessageReference(new MessageId(record.messageId));
+    }
+
     public synchronized void recover(MessageRecoveryListener listener) throws Exception{
         for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
             ReferenceRecord record=messageContainer.getValue(entry);
@@ -77,7 +68,7 @@
         }
         listener.finished();
     }
-    
+
     public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener
listener) throws Exception{
         StoreEntry entry=batchEntry;
         if(entry==null){
@@ -85,6 +76,9 @@
         }else{
             entry=messageContainer.refresh(entry);
             entry=messageContainer.getNext(entry);
+            if (entry==null) {
+                batchEntry=null;
+            }
         }
         if(entry!=null){
             int count=0;
@@ -100,38 +94,38 @@
         }
         listener.finished();
     }
-	
-	public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData
data) throws IOException {
-		ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
+
+    public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData
data)
+            throws IOException{
+        ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
         messageContainer.put(messageId,record);
-	}
+    }
 
-	public ReferenceData getMessageReference(MessageId identity) throws IOException {
-		
-		ReferenceRecord result=messageContainer.get(identity);
-        if( result == null )
-        	return null;
+    public ReferenceData getMessageReference(MessageId identity) throws IOException{
+        ReferenceRecord result=messageContainer.get(identity);
+        if(result==null)
+            return null;
         return result.data;
-	}
+    }
 
-	public void addReferenceFileIdsInUse(Set<Integer> rc) {
-        for (StoreEntry entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry))
{
-        	ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
+    public void addReferenceFileIdsInUse(Set<Integer> rc){
+        for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
+            ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
             rc.add(msg.data.getFileId());
         }
-	}
-    
+    }
+
     public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
         removeMessage(ack.getLastMessageId());
     }
-    
+
     public synchronized void removeMessage(MessageId msgId) throws IOException{
         messageContainer.remove(msgId);
         if(messageContainer.isEmpty()){
             resetBatching();
         }
     }
-    
+
     public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
         messageContainer.clear();
     }
@@ -143,21 +137,19 @@
     public synchronized void delete(){
         messageContainer.clear();
     }
-    
+
     public void resetBatching(){
         batchEntry=null;
     }
-    
+
     public int getMessageCount(){
         return messageContainer.size();
     }
-    
-    public void setUsageManager(UsageManager usageManager) {
+
+    public void setUsageManager(UsageManager usageManager){
     }
-    
+
     public boolean isSupportForCursors(){
         return true;
     }
-
-
 }



Mime
View raw message