activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r586580 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/acti...
Date Fri, 19 Oct 2007 19:01:11 GMT
Author: rajdavies
Date: Fri Oct 19 12:01:10 2007
New Revision: 586580

URL: http://svn.apache.org/viewvc?rev=586580&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1452 and
https://issues.apache.org/activemq/browse/AMQ-729

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
Fri Oct 19 12:01:10 2007
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq;
 
-import java.util.Map;
-
 import javax.jms.JMSException;
 import javax.jms.Message;
 
@@ -37,8 +35,9 @@
 
     private static final int DEFAULT_WINDOW_SIZE = 1024;
     private static final int MAXIMUM_PRODUCER_COUNT = 128;
-    private int windowSize;
-    private Map<Object, BitArrayBin> map;
+    private int auditDepth;
+    private int maximumNumberOfProducersToTrack;
+    private LRUCache<Object, BitArrayBin> map;
 
     /**
      * Default Constructor windowSize = 1024, maximumNumberOfProducersToTrack =
@@ -51,13 +50,44 @@
     /**
      * Construct a MessageAudit
      * 
-     * @param windowSize range of ids to track
+     * @param auditDepth range of ids to track
      * @param maximumNumberOfProducersToTrack number of producers expected in
      *                the system
      */
-    public ActiveMQMessageAudit(int windowSize, final int maximumNumberOfProducersToTrack)
{
-        this.windowSize = windowSize;
-        map = new LRUCache<Object, BitArrayBin>(maximumNumberOfProducersToTrack, maximumNumberOfProducersToTrack,
0.75f, true);
+    public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack)
{
+        this.auditDepth = auditDepth;
+        this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
+        this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack,
0.75f, true);
+    }
+    
+    /**
+     * @return the auditDepth
+     */
+    public int getAuditDepth() {
+        return auditDepth;
+    }
+
+    /**
+     * @param auditDepth the auditDepth to set
+     */
+    public void setAuditDepth(int auditDepth) {
+        this.auditDepth = auditDepth;
+    }
+
+    /**
+     * @return the maximumNumberOfProducersToTrack
+     */
+    public int getMaximumNumberOfProducersToTrack() {
+        return maximumNumberOfProducersToTrack;
+    }
+
+    /**
+     * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
+     */
+    public void setMaximumNumberOfProducersToTrack(
+            int maximumNumberOfProducersToTrack) {
+        this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
+        this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
     }
 
     /**
@@ -67,7 +97,7 @@
      * @return true if the message is a duplicate
      * @throws JMSException
      */
-    public boolean isDuplicateMessage(Message message) throws JMSException {
+    public boolean isDuplicate(Message message) throws JMSException {
         return isDuplicate(message.getJMSMessageID());
     }
 
@@ -84,7 +114,7 @@
         if (seed != null) {
             BitArrayBin bab = map.get(seed);
             if (bab == null) {
-                bab = new BitArrayBin(windowSize);
+                bab = new BitArrayBin(auditDepth);
                 map.put(seed, bab);
             }
             long index = IdGenerator.getSequenceFromId(id);
@@ -101,9 +131,9 @@
      * @param message
      * @return true if the message is a duplicate
      */
-    public boolean isDuplicateMessageReference(final MessageReference message) {
+    public boolean isDuplicate(final MessageReference message) {
         MessageId id = message.getMessageId();
-        return isDuplicateMessageId(id);
+        return isDuplicate(id);
     }
     
     /**
@@ -112,7 +142,7 @@
      * @param id
      * @return true if the message is a duplicate
      */
-    public synchronized boolean isDuplicateMessageId(final MessageId id) {
+    public synchronized boolean isDuplicate(final MessageId id) {
         boolean answer = false;
         
         if (id != null) {
@@ -120,7 +150,7 @@
             if (pid != null) {
                 BitArrayBin bab = map.get(pid);
                 if (bab == null) {
-                    bab = new BitArrayBin(windowSize);
+                    bab = new BitArrayBin(auditDepth);
                     map.put(pid, bab);
                 }
                 answer = bab.setBit(id.getProducerSequenceId(), true);
@@ -134,9 +164,9 @@
      * 
      * @param message
      */
-    public void rollbackMessageReference(final MessageReference message) {
+    public void rollback(final MessageReference message) {
         MessageId id = message.getMessageId();
-        rollbackMessageId(id);
+        rollback(id);
     }
     
     /**
@@ -144,7 +174,7 @@
      * 
      * @param id
      */
-    public synchronized void rollbackMessageId(final  MessageId id) {
+    public synchronized void rollback(final  MessageId id) {
         if (id != null) {
             ProducerId pid = id.getProducerId();
             if (pid != null) {
@@ -155,4 +185,58 @@
             }
         }
     }
+    
+    /**
+     * Check the message is in order
+     * @param msg
+     * @return
+     * @throws JMSException
+     */
+    public boolean isInOrder(Message msg) throws JMSException {
+        return isInOrder(msg.getJMSMessageID());
+    }
+    
+    /**
+     * Check the message id is in order
+     * @param id
+     * @return
+     */
+    public synchronized boolean isInOrder(final String id) {
+        boolean answer = true;
+        
+        if (id != null) {
+            String seed = IdGenerator.getSeedFromId(id);
+            if (seed != null) {
+                BitArrayBin bab = map.get(seed);
+                if (bab != null) {
+                    long index = IdGenerator.getSequenceFromId(id);
+                    answer = bab.isInOrder(index);
+                }
+               
+            }
+        }
+        return answer;
+    }
+    
+    /**
+     * Check the MessageId is in order
+     * @param id
+     * @return
+     */
+    public synchronized boolean isInOrder(final MessageId id) {
+        boolean answer = true;
+        
+        if (id != null) {
+            ProducerId pid = id.getProducerId();
+            if (pid != null) {
+                BitArrayBin bab = map.get(pid);
+                if (bab != null) {
+                    answer = bab.isInOrder(id.getProducerSequenceId());
+                }
+               
+            }
+        }
+        return answer;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java Fri
Oct 19 12:01:10 2007
@@ -44,7 +44,7 @@
                         audit = new ActiveMQMessageAudit();
                         destinations.put(destination, audit);
                     }
-                    boolean result = audit.isDuplicateMessageReference(message);
+                    boolean result = audit.isDuplicate(message);
                     return result;
                 }
                 ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
@@ -52,7 +52,7 @@
                     audit = new ActiveMQMessageAudit();
                     dispatchers.put(dispatcher, audit);
                 }
-                boolean result = audit.isDuplicateMessageReference(message);
+                boolean result = audit.isDuplicate(message);
                 return result;
             }
         }
@@ -66,12 +66,12 @@
                 if (destination.isQueue()) {
                     ActiveMQMessageAudit audit = destinations.get(destination);
                     if (audit != null) {
-                        audit.rollbackMessageReference(message);
+                        audit.rollback(message);
                     }
                 } else {
                     ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
                     if (audit != null) {
-                        audit.rollbackMessageReference(message);
+                        audit.rollback(message);
                     }
                 }
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
Fri Oct 19 12:01:10 2007
@@ -211,14 +211,14 @@
 
                     public void afterRollback() {
                         if (audit != null) {
-                            audit.rollbackMessageReference(message);
+                            audit.rollback(message);
                         }
                     }
                 };
                 transaction.addSynchronization(sync);
             }
         }
-        if (audit == null || !audit.isDuplicateMessageReference(message)) {
+        if (audit == null || !audit.isDuplicate(message)) {
             context.setTransaction(transaction);
             try {
                 next.send(producerExchange, message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Fri Oct 19 12:01:10 2007
@@ -23,12 +23,57 @@
 public abstract class BaseDestination implements Destination {
 
     private boolean producerFlowControl = true;
-
+    private int maxProducersToAudit=1024;
+    private int maxAuditDepth=1;
+    private boolean enableAudit=true;
+    /**
+     * @return the producerFlowControl
+     */
     public boolean isProducerFlowControl() {
-        return this.producerFlowControl;
+        return producerFlowControl;
     }
-
-    public void setProducerFlowControl(boolean value) {
-        this.producerFlowControl = value;
+    /**
+     * @param producerFlowControl the producerFlowControl to set
+     */
+    public void setProducerFlowControl(boolean producerFlowControl) {
+        this.producerFlowControl = producerFlowControl;
+    }
+    /**
+     * @return the maxProducersToAudit
+     */
+    public int getMaxProducersToAudit() {
+        return maxProducersToAudit;
+    }
+    /**
+     * @param maxProducersToAudit the maxProducersToAudit to set
+     */
+    public void setMaxProducersToAudit(int maxProducersToAudit) {
+        this.maxProducersToAudit = maxProducersToAudit;
+    }
+    /**
+     * @return the maxAuditDepth
+     */
+    public int getMaxAuditDepth() {
+        return maxAuditDepth;
     }
+    /**
+     * @param maxAuditDepth the maxAuditDepth to set
+     */
+    public void setMaxAuditDepth(int maxAuditDepth) {
+        this.maxAuditDepth = maxAuditDepth;
+    }
+    /**
+     * @return the enableAudit
+     */
+    public boolean isEnableAudit() {
+        return enableAudit;
+    }
+    /**
+     * @param enableAudit the enableAudit to set
+     */
+    public void setEnableAudit(boolean enableAudit) {
+        this.enableAudit = enableAudit;
+    }
+
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Oct 19 12:01:10 2007
@@ -142,6 +142,9 @@
         if (store != null) {
             // Restore the persistent messages.
             messages.setSystemUsage(systemUsage);
+            messages.setEnableAudit(isEnableAudit());
+            messages.setMaxAuditDepth(getMaxAuditDepth());
+            messages.setMaxProducersToAudit(getMaxProducersToAudit());
             if (messages.isRecoveryRequired()) {
                 store.recover(new MessageRecoveryListener() {
 
@@ -442,7 +445,7 @@
         }
     }
 
-    void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
throws IOException, Exception {
+    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final
Message message) throws IOException, Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
         message.setRegionDestination(this);
         if (store != null && message.isPersistent()) {
@@ -567,7 +570,7 @@
         doPageIn(false);
     }
 
-    public void stop() throws Exception {
+    public void stop() throws Exception{
         if (taskRunner != null) {
             taskRunner.shutdown();
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Fri Oct 19 12:01:10 2007
@@ -17,9 +17,12 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.util.LinkedList;
+
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.usage.SystemUsage;
 
 /**
@@ -32,11 +35,21 @@
     protected int memoryUsageHighWaterMark = 90;
     protected int maxBatchSize = 100;
     protected SystemUsage systemUsage;
-
-    public void start() throws Exception {
+    protected int maxProducersToAudit=1024;
+    protected int maxAuditDepth=1;
+    protected boolean enableAudit=true;
+    protected ActiveMQMessageAudit audit;
+    private boolean started=false;
+
+    public synchronized void start() throws Exception  {
+        if (!started && enableAudit && audit==null) {
+            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+        }
+        started=true;
     }
 
-    public void stop() throws Exception {
+    public synchronized void stop() throws Exception  {
+        started=false;
         gc();
     }
 
@@ -168,4 +181,68 @@
     public LinkedList pageInList(int maxItems) {
         throw new RuntimeException("Not supported");
     }
+
+    /**
+     * @return the maxProducersToAudit
+     */
+    public int getMaxProducersToAudit() {
+        return maxProducersToAudit;
+    }
+
+    /**
+     * @param maxProducersToAudit the maxProducersToAudit to set
+     */
+    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
+        this.maxProducersToAudit = maxProducersToAudit;
+        if (audit != null) {
+            this.audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
+        }
+    }
+
+    /**
+     * @return the maxAuditDepth
+     */
+    public int getMaxAuditDepth() {
+        return this.maxAuditDepth;
+    }
+    
+
+    /**
+     * @param maxAuditDepth the maxAuditDepth to set
+     */
+    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
+        this.maxAuditDepth = maxAuditDepth;
+        if (audit != null) {
+            this.audit.setAuditDepth(maxAuditDepth);
+        }
+    }
+    
+    
+    /**
+     * @return the enableAudit
+     */
+    public boolean isEnableAudit() {
+        return this.enableAudit;
+    }
+
+    /**
+     * @param enableAudit the enableAudit to set
+     */
+    public synchronized void setEnableAudit(boolean enableAudit) {
+        this.enableAudit = enableAudit;
+        if (this.enableAudit && started && audit==null) {
+            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+        }
+    }
+
+
+    protected synchronized boolean  isDuplicate(MessageId messageId) {
+        if (!this.enableAudit || this.audit==null) {
+            return false;
+        }
+        return this.audit.isDuplicate(messageId);
+    }
+
+   
+   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Fri Oct 19 12:01:10 2007
@@ -63,17 +63,18 @@
         this.store = store;
     }
 
-    public void start() {
+    public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
+            super.start();
             if (systemUsage != null) {
                 systemUsage.getMemoryUsage().addUsageListener(this);
             }
         }
     }
 
-    public void stop() {
+    public void stop() throws Exception {
         if (started.compareAndSet(true, false)) {
-            gc();
+            super.stop();
             if (systemUsage != null) {
                 systemUsage.getMemoryUsage().removeUsageListener(this);
             }
@@ -118,7 +119,7 @@
         }
     }
 
-    public synchronized void destroy() {
+    public synchronized void destroy() throws Exception {
         stop();
         for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
             Message node = (Message)i.next();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Fri Oct 19 12:01:10 2007
@@ -210,5 +210,37 @@
      * @return a list of paged in messages
      */
     LinkedList pageInList(int maxItems);
+    
+    /**
+     * set the maximum number of producers to track at one time
+     * @param value
+     */
+    void setMaxProducersToAudit(int value);
+    
+    /**
+     * @return the maximum number of producers to audit
+     */
+    int getMaxProducersToAudit();
+    
+    /**
+     * Set the maximum depth of message ids to track
+     * @param depth 
+     */
+    void setMaxAuditDepth(int depth);
+    
+    /**
+     * @return the audit depth
+     */
+    int getMaxAuditDepth();
+    
+    /**
+     * @return the enableAudit
+     */
+    public boolean isEnableAudit();
+    /**
+     * @param enableAudit the enableAudit to set
+     */
+    public void setEnableAudit(boolean enableAudit);
+
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Fri Oct 19 12:01:10 2007
@@ -29,7 +29,7 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * perist pending messages pending message (messages awaiting disptach to a
+ * persist pending messages pending message (messages awaiting dispatch to a
  * consumer) cursor
  * 
  * @version $Revision: 474985 $
@@ -42,6 +42,7 @@
     private final LinkedList<Message> batchList = new LinkedList<Message>();
     private Destination regionDestination;
     private int size;
+    private boolean fillBatchDuplicates;
 
     /**
      * @param topic
@@ -55,13 +56,14 @@
 
     }
 
-    public void start() throws Exception {
+    public void start() throws Exception{
+        super.start();
         store.resetBatching();
     }
 
     public void stop() throws Exception {
         store.resetBatching();
-        gc();
+        super.stop();
     }
 
     /**
@@ -127,10 +129,18 @@
     public void finished() {
     }
 
-    public boolean recoverMessage(Message message) throws Exception {
-        message.setRegionDestination(regionDestination);
-        message.incrementReferenceCount();
-        batchList.addLast(message);
+    public synchronized boolean recoverMessage(Message message)
+            throws Exception {
+        if (!isDuplicate(message.getMessageId())) {
+            message.setRegionDestination(regionDestination);
+            message.incrementReferenceCount();
+            batchList.addLast(message);
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Ignoring batched duplicated from store: " + message);
+            }
+            fillBatchDuplicates=true;
+        }
         return true;
     }
 
@@ -153,8 +163,13 @@
     }
 
     // implementation
-    protected void fillBatch() throws Exception {
+    protected synchronized void fillBatch() throws Exception {
         store.recoverNextMessages(maxBatchSize, this);
+        while (fillBatchDuplicates && batchList.isEmpty()) {
+            fillBatchDuplicates=false;
+            store.recoverNextMessages(maxBatchSize, this);
+        }
+        fillBatchDuplicates=false;
     }
 
     public String toString() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Fri Oct 19 12:01:10 2007
@@ -69,6 +69,7 @@
     public synchronized void start() throws Exception {
         if (!started) {
             started = true;
+            super.start();
             for (PendingMessageCursor tsp : storePrefetches) {
                 tsp.start();
             }
@@ -78,6 +79,7 @@
     public synchronized void stop() throws Exception {
         if (started) {
             started = false;
+            super.stop();
             for (PendingMessageCursor tsp : storePrefetches) {
                 tsp.stop();
             }
@@ -96,6 +98,9 @@
             TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId,
subscriberName, subscription);
             tsp.setMaxBatchSize(getMaxBatchSize());
             tsp.setSystemUsage(systemUsage);
+            tsp.setEnableAudit(isEnableAudit());
+            tsp.setMaxAuditDepth(getMaxAuditDepth());
+            tsp.setMaxProducersToAudit(getMaxProducersToAudit());
             topics.put(destination, tsp);
             storePrefetches.add(tsp);
             if (started) {
@@ -251,6 +256,36 @@
         for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();)
{
             PendingMessageCursor tsp = i.next();
             tsp.setSystemUsage(usageManager);
+        }
+    }
+    
+    public void setMaxProducersToAudit(int maxProducersToAudit) {
+        super.setMaxProducersToAudit(maxProducersToAudit);
+        for (PendingMessageCursor cursor : storePrefetches) {
+            cursor.setMaxAuditDepth(maxAuditDepth);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
+        }
+    }
+
+    public void setMaxAuditDepth(int maxAuditDepth) {
+        super.setMaxAuditDepth(maxAuditDepth);
+        for (PendingMessageCursor cursor : storePrefetches) {
+            cursor.setMaxAuditDepth(maxAuditDepth);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.setMaxAuditDepth(maxAuditDepth);
+        }
+    }
+    
+    public synchronized void setEnableAudit(boolean enableAudit) {
+        super.setEnableAudit(enableAudit);
+        for (PendingMessageCursor cursor : storePrefetches) {
+            cursor.setEnableAudit(enableAudit);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.setEnableAudit(enableAudit);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Fri Oct 19 12:01:10 2007
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
@@ -55,10 +56,14 @@
 
     public synchronized void start() throws Exception {
         started = true;
+        super.start();
         if (nonPersistent == null) {
             nonPersistent = new FilePendingMessageCursor(queue.getDestination(), tmpStore);
             nonPersistent.setMaxBatchSize(getMaxBatchSize());
             nonPersistent.setSystemUsage(systemUsage);
+            nonPersistent.setEnableAudit(isEnableAudit());
+            nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
+            nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
         }
         nonPersistent.start();
         persistent.start();
@@ -67,6 +72,7 @@
 
     public synchronized void stop() throws Exception {
         started = false;
+        super.stop();
         if (nonPersistent != null) {
             nonPersistent.stop();
             nonPersistent.gc();
@@ -191,6 +197,39 @@
         }
         super.setMaxBatchSize(maxBatchSize);
     }
+    
+    
+    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
+        super.setMaxProducersToAudit(maxProducersToAudit);
+        if (persistent != null) {
+            persistent.setMaxProducersToAudit(maxProducersToAudit);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
+        }
+    }
+
+    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
+        super.setMaxAuditDepth(maxAuditDepth);
+        if (persistent != null) {
+            persistent.setMaxAuditDepth(maxAuditDepth);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.setMaxAuditDepth(maxAuditDepth);
+        }
+    }
+    
+    public synchronized void setEnableAudit(boolean enableAudit) {
+        super.setEnableAudit(enableAudit);
+        if (persistent != null) {
+            persistent.setEnableAudit(enableAudit);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.setEnableAudit(enableAudit);
+        }
+    }
+
+
 
     public synchronized void gc() {
         if (persistent != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Fri Oct 19 12:01:10 2007
@@ -65,16 +65,18 @@
         this.subscriberName = subscriberName;
     }
 
-    public synchronized void start() {
+    public synchronized void start() throws Exception {
         if (!started) {
             started = true;
+            super.start();
             safeFillBatch();
         }
     }
 
-    public synchronized void stop() {
+    public synchronized void stop() throws Exception {
         if (started) {
             started = false;
+            super.stop();
             store.resetBatching(clientId, subscriberName);
             gc();
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Fri Oct 19 12:01:10 2007
@@ -51,6 +51,9 @@
     private PendingQueueMessageStoragePolicy pendingQueuePolicy;
     private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
     private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
+    private int maxProducersToAudit=1024;
+    private int maxAuditDepth=1;
+    private boolean enableAudit=true;
     private boolean producerFlowControl = true;
 
     public void configure(Queue queue, Store tmpStore) {
@@ -69,6 +72,9 @@
             queue.setMessages(messages);
         }
         queue.setProducerFlowControl(isProducerFlowControl());
+        queue.setEnableAudit(isEnableAudit());
+        queue.setMaxAuditDepth(getMaxAuditDepth());
+        queue.setMaxProducersToAudit(getMaxProducersToAudit());
     }
 
     public void configure(Topic topic) {
@@ -86,6 +92,9 @@
             topic.getBrokerMemoryUsage().setLimit(memoryLimit);
         }
         topic.setProducerFlowControl(isProducerFlowControl());
+        topic.setEnableAudit(isEnableAudit());
+        topic.setMaxAuditDepth(getMaxAuditDepth());
+        topic.setMaxProducersToAudit(getMaxProducersToAudit());
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription)
{
@@ -266,12 +275,60 @@
         this.pendingSubscriberPolicy = pendingSubscriberPolicy;
     }
 
+    /**
+     * @return true if producer flow control enabled
+     */
     public boolean isProducerFlowControl() {
         return producerFlowControl;
     }
 
+    /**
+     * @param producerFlowControl
+     */
     public void setProducerFlowControl(boolean producerFlowControl) {
         this.producerFlowControl = producerFlowControl;
+    }
+
+    /**
+     * @return the maxProducersToAudit
+     */
+    public int getMaxProducersToAudit() {
+        return maxProducersToAudit;
+    }
+
+    /**
+     * @param maxProducersToAudit the maxProducersToAudit to set
+     */
+    public void setMaxProducersToAudit(int maxProducersToAudit) {
+        this.maxProducersToAudit = maxProducersToAudit;
+    }
+
+    /**
+     * @return the maxAuditDepth
+     */
+    public int getMaxAuditDepth() {
+        return maxAuditDepth;
+    }
+
+    /**
+     * @param maxAuditDepth the maxAuditDepth to set
+     */
+    public void setMaxAuditDepth(int maxAuditDepth) {
+        this.maxAuditDepth = maxAuditDepth;
+    }
+
+    /**
+     * @return the enableAudit
+     */
+    public boolean isEnableAudit() {
+        return enableAudit;
+    }
+
+    /**
+     * @param enableAudit the enableAudit to set
+     */
+    public void setEnableAudit(boolean enableAudit) {
+        this.enableAudit = enableAudit;
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java Fri
Oct 19 12:01:10 2007
@@ -29,6 +29,7 @@
     private int maxNumberOfArrays;
     private int firstIndex = -1;
     private int firstBin = -1;
+    private long lastBitSet=-1;
 
     /**
      * Create a BitArrayBin to a certain window size (number of messages to
@@ -60,8 +61,25 @@
             if (offset >= 0) {
                 answer = ba.set(offset, value);
             }
+            if (value) {
+                lastBitSet=index;
+            }else {
+                lastBitSet=-1;
+            }
         }
         return answer;
+    }
+    
+    /**
+     * Test if in order
+     * @param index
+     * @return true if next message is in order
+     */
+    public boolean isInOrder(long index) {
+        if (lastBitSet== -1) {
+            return true;
+        }
+        return lastBitSet+1==index;
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
Fri Oct 19 12:01:10 2007
@@ -88,10 +88,32 @@
             ActiveMQMessage msg = new ActiveMQMessage();
             msg.setMessageId(id);
             list.add(msg);
-            assertFalse(audit.isDuplicateMessageReference(msg));
+            assertFalse(audit.isDuplicate(msg.getMessageId()));
         }
         for (MessageReference msg : list) {
-            assertTrue(audit.isDuplicateMessageReference(msg));
+            assertTrue(audit.isDuplicate(msg));
+        }
+    }
+    
+    public void testIsInOrderString() {
+        int count = 10000;
+        ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
+        IdGenerator idGen = new IdGenerator();
+        // add to a list
+        List<String> list = new ArrayList<String>();
+        for (int i = 0; i < count; i++) {
+            String id = idGen.generateId();
+            if (i==0) {
+                assertFalse(audit.isDuplicate(id));
+            }
+            if (i > 1 && i%2 != 0) {
+                list.add(id);
+            }
+          
+        }
+        for (String id : list) {
+            assertFalse(audit.isInOrder(id));
+            assertFalse(audit.isDuplicate(id));
         }
     }
 }



Mime
View raw message