activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r513921 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: amq/ kahadaptor/
Date Fri, 02 Mar 2007 20:00:13 GMT
Author: rajdavies
Date: Fri Mar  2 12:00:11 2007
New Revision: 513921

URL: http://svn.apache.org/viewvc?view=rev&rev=513921
Log:
Fixed Queue cursor test case for AMQ Store

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?view=diff&rev=513921&r1=513920&r2=513921
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Fri Mar  2 12:00:11 2007
@@ -68,8 +68,9 @@
     protected HashSet<Location> inFlightTxLocations=new HashSet<Location>();
     protected final TaskRunner asyncWriteTask;
     protected CountDownLatch flushLatch;
+    private final boolean debug=log.isDebugEnabled();
     private final AtomicReference<Location> mark=new AtomicReference<Location>();
-
+    
     public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore,ActiveMQDestination
destination){
         this.peristenceAdapter=adapter;
         this.transactionStore=adapter.getTransactionStore();
@@ -95,7 +96,7 @@
      */
     public void addMessage(ConnectionContext context,final Message message) throws IOException{
         final MessageId id=message.getMessageId();
-        final boolean debug=log.isDebugEnabled();
+        
         final Location location=peristenceAdapter.writeCommand(message,message.isResponseRequired());
         if(!context.isInTransaction()){
             if(debug)
@@ -168,7 +169,6 @@
     /**
      */
     public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{
-        final boolean debug=log.isDebugEnabled();
         JournalQueueAck remove=new JournalQueueAck();
         remove.setDestination(destination);
         remove.setMessageAck(ack);
@@ -450,6 +450,7 @@
     }
 
     public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws
Exception{
+        /*
         RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
         if(referenceStore.supportsExternalBatchControl()){
             synchronized(this){
@@ -469,6 +470,13 @@
                 }
             }
         }else{
+            flush();
+            referenceStore.recoverNextMessages(maxReturned,recoveryListener);
+        }
+        */
+        RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
+        referenceStore.recoverNextMessages(maxReturned,recoveryListener);
+        if(recoveryListener.size()==0&&recoveryListener.hasSpace()){
             flush();
             referenceStore.recoverNextMessages(maxReturned,recoveryListener);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?view=diff&rev=513921&r1=513920&r2=513921
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
Fri Mar  2 12:00:11 2007
@@ -51,9 +51,7 @@
     public void recoverMessageReference(MessageId ref) throws Exception{
         Message message=this.store.getMessage(ref);
         if(message!=null){
-            listener.recoverMessage(message);
-            count++;
-            lastRecovered=ref;
+            recoverMessage(message);
         }else{
             log.error("Message id "+ref+" could not be recovered from the data store!");
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=513921&r1=513920&r2=513921
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Fri Mar  2 12:00:11 2007
@@ -32,7 +32,7 @@
     protected final ActiveMQDestination destination;
     protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
     protected KahaReferenceStoreAdapter adapter;
-    protected StoreEntry batchEntry=null;
+    private StoreEntry batchEntry=null;
 
     public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination
destination) throws IOException{
         this.adapter = adapter;
@@ -47,7 +47,7 @@
     }
 
     protected MessageId getMessageId(Object object){
-        return new MessageId(((ReferenceRecord)object).messageId);
+        return new MessageId(((ReferenceRecord)object).getMessageId());
     }
 
     public synchronized void addMessage(ConnectionContext context,Message message) throws
IOException{
@@ -60,13 +60,13 @@
 
     protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
         ReferenceRecord record=(ReferenceRecord)msg;
-        listener.recoverMessageReference(new MessageId(record.messageId));
+        listener.recoverMessageReference(new MessageId(record.getMessageId()));
     }
 
     public synchronized void recover(MessageRecoveryListener listener) throws Exception{
         for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
             ReferenceRecord record=messageContainer.getValue(entry);
-            recover(listener,new MessageId(record.messageId));
+            recover(listener,new MessageId(record.getMessageId()));
         }
         listener.finished();
     }
@@ -78,9 +78,6 @@
         }else{
             entry=messageContainer.refresh(entry);
             entry=messageContainer.getNext(entry);
-            if (entry==null) {
-                batchEntry=null;
-            }
         }
         if(entry!=null){
             int count=0;
@@ -108,7 +105,7 @@
         ReferenceRecord result=messageContainer.get(identity);
         if(result==null)
             return null;
-        return result.data;
+        return result.getData();
     }
 
     public void addReferenceFileIdsInUse(){
@@ -123,10 +120,12 @@
     }
 
     public synchronized void removeMessage(MessageId msgId) throws IOException{
-        ReferenceRecord rr = messageContainer.remove(msgId);
-        removeInterest(rr);
-        if(messageContainer.isEmpty()){
-            resetBatching();
+        ReferenceRecord rr=messageContainer.remove(msgId);
+        if(rr!=null){
+            removeInterest(rr);
+            if(messageContainer.isEmpty()){
+                resetBatching();
+            }
         }
     }
 
@@ -157,27 +156,23 @@
         return true;
     }
 
-    /**
-     * @param startAfter
-     * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
-     */
-    public void setBatch(MessageId startAfter){
-        resetBatching();
-        if (startAfter != null) {
-            batchEntry = messageContainer.getEntry(startAfter);
-        }
-        
-    }
-
+    
     public boolean supportsExternalBatchControl(){
         return true;
     }
     
     void removeInterest(ReferenceRecord rr) {
-        adapter.removeInterestInRecordFile(rr.data.getFileId());
+        adapter.removeInterestInRecordFile(rr.getData().getFileId());
     }
     
     void addInterest(ReferenceRecord rr) {
-        adapter.addInterestInRecordFile(rr.data.getFileId());
+        adapter.addInterestInRecordFile(rr.getData().getFileId());
+    }
+
+    /**
+     * @param startAfter
+     * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
+     */
+    public void setBatch(MessageId startAfter){        
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=513921&r1=513920&r2=513921
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Fri Mar  2 12:00:11 2007
@@ -53,7 +53,7 @@
     }
 
     protected MessageId getMessageId(Object object){
-        return new MessageId(((ReferenceRecord)object).messageId);
+        return new MessageId(((ReferenceRecord)object).getMessageId());
     }
 
     public synchronized void addMessage(ConnectionContext context,Message message) throws
IOException{
@@ -66,7 +66,7 @@
 
     protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
         ReferenceRecord record=(ReferenceRecord)msg;
-        listener.recoverMessageReference(new MessageId(record.messageId));
+        listener.recoverMessageReference(new MessageId(record.getMessageId()));
     }
 
     public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData
data)
@@ -94,7 +94,7 @@
         ReferenceRecord result=messageContainer.get(identity);
         if(result==null)
             return null;
-        return result.data;
+        return result.getData();
     }
 
     public void addReferenceFileIdsInUse(){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java?view=diff&rev=513921&r1=513920&r2=513921
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java
Fri Mar  2 12:00:11 2007
@@ -18,8 +18,8 @@
 
 public class ReferenceRecord{
 
-    public String messageId;
-    public ReferenceData data;
+    private String messageId;
+    private ReferenceData data;
 
     public ReferenceRecord(){
     }
@@ -27,5 +27,37 @@
     public ReferenceRecord(String messageId,ReferenceData data){
         this.messageId=messageId;
         this.data=data;
+    }
+
+    
+    /**
+     * @return the data
+     */
+    public ReferenceData getData(){
+        return this.data;
+    }
+
+    
+    /**
+     * @param data the data to set
+     */
+    public void setData(ReferenceData data){
+        this.data=data;
+    }
+
+    
+    /**
+     * @return the messageId
+     */
+    public String getMessageId(){
+        return this.messageId;
+    }
+
+    
+    /**
+     * @param messageId the messageId to set
+     */
+    public void setMessageId(String messageId){
+        this.messageId=messageId;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java?view=diff&rev=513921&r1=513920&r2=513921
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java
Fri Mar  2 12:00:11 2007
@@ -24,11 +24,12 @@
 
     public ReferenceRecord readPayload(DataInput dataIn) throws IOException{
         ReferenceRecord rr=new ReferenceRecord();
-        rr.messageId=dataIn.readUTF();
-        rr.data=new ReferenceData();
-        rr.data.setFileId(dataIn.readInt());
-        rr.data.setOffset(dataIn.readInt());
-        rr.data.setExpiration(dataIn.readLong());
+        rr.setMessageId(dataIn.readUTF());
+        ReferenceData referenceData = new ReferenceData();
+        referenceData.setFileId(dataIn.readInt());
+        referenceData.setOffset(dataIn.readInt());
+        referenceData.setExpiration(dataIn.readLong());
+        rr.setData(referenceData);
         return rr;
     }
 
@@ -39,9 +40,9 @@
      * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput)
      */
     public void writePayload(ReferenceRecord rr,DataOutput dataOut) throws IOException{
-        dataOut.writeUTF(rr.messageId);
-        dataOut.writeInt(rr.data.getFileId());
-        dataOut.writeInt(rr.data.getOffset());
-        dataOut.writeLong(rr.data.getExpiration());
+        dataOut.writeUTF(rr.getMessageId());
+        dataOut.writeInt(rr.getData().getFileId());
+        dataOut.writeInt(rr.getData().getOffset());
+        dataOut.writeLong(rr.getData().getExpiration());
     }
 }



Mime
View raw message