activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r613230 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: advisory/ broker/region/ broker/region/policy/
Date Fri, 18 Jan 2008 19:16:17 GMT
Author: rajdavies
Date: Fri Jan 18 11:16:15 2008
New Revision: 613230

URL: http://svn.apache.org/viewvc?rev=613230&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1553

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=613230&r1=613229&r2=613230&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Fri Jan 18 11:16:15 2008
@@ -137,7 +137,7 @@
         // Don't advise advisory topics.
         if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination()))
{
             ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
-            fireAdvisory(context, topic, info);
+            fireProducerAdvisory(context, info.getDestination(), topic, info);
             producers.put(info.getProducerId(), info);
         }
     }
@@ -282,8 +282,7 @@
             Set<Destination> set = getDestinations(producerDestination);
             if (set != null) {
                 for (Destination dest : set) {
-                    count += dest.getDestinationStatistics().getConsumers()
-                            .getCount();
+                    count += dest.getDestinationStatistics().getProducers().getCount();
                 }
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java?rev=613230&r1=613229&r2=613230&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java
Fri Jan 18 11:16:15 2008
@@ -112,6 +112,7 @@
                 return n.intValue();
             }
             LOG.warn("No producerCount header available on the message: " + message);
+            Thread.dumpStack();
         } catch (Exception e) {
             LOG.warn("Failed to extract producerCount from message: " + message + ".Reason:
" + e, e);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=613230&r1=613229&r2=613230&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Fri Jan 18 11:16:15 2008
@@ -16,8 +16,13 @@
  */
 package org.apache.activemq.broker.region;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 
 
 /**
@@ -25,11 +30,40 @@
  */
 public abstract class BaseDestination implements Destination {
 
+    protected final ActiveMQDestination destination;
+    protected final Broker broker;
+    protected final MessageStore store;
+    protected final SystemUsage systemUsage;
+    protected final MemoryUsage memoryUsage;
     private boolean producerFlowControl = true;
     private int maxProducersToAudit=1024;
     private int maxAuditDepth=1;
     private boolean enableAudit=true;
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
+    
+    /**
+     * @param broker 
+     * @param store 
+     * @param destination
+     * @param systemUsage 
+     * @param parentStats
+     */
+    public BaseDestination(Broker broker,MessageStore store,ActiveMQDestination destination,
SystemUsage systemUsage,DestinationStatistics parentStats) {
+        this.broker=broker;
+        this.store=store;
+        this.destination=destination;
+        this.systemUsage=systemUsage;
+        this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
+        this.memoryUsage.setUsagePortion(1.0f);
+        // Let the store know what usage manager we are using so that he can
+        // flush messages to disk when usage gets high.
+        if (store != null) {
+            store.setMemoryUsage(this.memoryUsage);
+        } 
+        // let's copy the enabled property from the parent DestinationStatistics
+        this.destinationStatistics.setEnabled(parentStats.isEnabled());
+        this.destinationStatistics.setParent(parentStats);        
+    }
     /**
      * @return the producerFlowControl
      */
@@ -86,6 +120,31 @@
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
         destinationStatistics.getProducers().decrement();
     }
+    
+    public final MemoryUsage getBrokerMemoryUsage() {
+        return memoryUsage;
+    }
+
+    public DestinationStatistics getDestinationStatistics() {
+        return destinationStatistics;
+    }
+
+    public ActiveMQDestination getActiveMQDestination() {
+        return destination;
+    }
+
+    public final String getDestination() {
+        return destination.getPhysicalName();
+    }
+    
+    public final String getName() {
+        return getActiveMQDestination().getPhysicalName();
+    }
+    
+    public final MessageStore getMessageStore() {
+        return store;
+    }
+
 
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=613230&r1=613229&r2=613230&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Jan 18 11:16:15 2008
@@ -19,6 +19,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -58,7 +59,6 @@
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transaction.Synchronization;
-import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
@@ -71,26 +71,16 @@
  * @version $Revision: 1.28 $
  */
 public class Queue extends BaseDestination implements Task {
-
-    final Broker broker;
-
+    private static int MAXIMUM_PAGE_SIZE  = 1000;
     private final Log log;
-    private final ActiveMQDestination destination;
     private final List<Subscription> consumers = new ArrayList<Subscription>(50);
-    private final SystemUsage systemUsage;
-    private final MemoryUsage memoryUsage;
     private PendingMessageCursor messages;
-    private final LinkedList<MessageReference> pagedInMessages = new LinkedList<MessageReference>();
+    private final LinkedHashMap<MessageId,MessageReference> pagedInMessages = new LinkedHashMap<MessageId,MessageReference>();
     private LockOwner exclusiveOwner;
     private MessageGroupMap messageGroupOwners;
-
-    private int garbageSize;
-    private int garbageSizeBeforeCollection = 1000;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
-    private final MessageStore store;
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
-    private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
     private final Object exclusiveLockMutex = new Object();
     private final Object sendLock = new Object();
     private final TaskRunner taskRunner;
@@ -104,15 +94,11 @@
             }
         };
     };
-
+    
     public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage,
MessageStore store, DestinationStatistics parentStats,
                  TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
-        this.broker = broker;
-        this.destination = destination;
-        this.systemUsage=systemUsage;
-        this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
-        this.memoryUsage.setUsagePortion(1.0f);
-        this.store = store;
+        super(broker, store, destination,systemUsage, parentStats);
+        
         if (destination.isTemporary() || tmpStore==null ) {
             this.messages = new VMPendingMessageCursor();
         } else {
@@ -120,19 +106,7 @@
         }
 
         this.taskRunner = taskFactory.createTaskRunner(this, "Queue  " + destination.getPhysicalName());
-
-        // Let the store know what usage manager we are using so that he can
-        // flush messages to disk
-        // when usage gets high.
-        if (store != null) {
-            store.setMemoryUsage(memoryUsage);
-        }
-
-        // let's copy the enabled property from the parent DestinationStatistics
-        this.destinationStatistics.setEnabled(parentStats.isEnabled());
-        destinationStatistics.setParent(parentStats);
         this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
-
     }
 
     public void initialize() throws Exception {
@@ -204,8 +178,6 @@
     public void addSubscription(ConnectionContext context,Subscription sub) throws Exception
{
         sub.add(context, this);
         destinationStatistics.getConsumers().increment();
-        maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize();
-
         MessageEvaluationContext msgContext = new MessageEvaluationContext();
 
         // needs to be synchronized - so no contention with dispatching
@@ -239,7 +211,7 @@
         synchronized (pagedInMessages) {
             // Add all the matching messages in the queue to the
             // subscription.
-            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i
+            for (Iterator<MessageReference> i = pagedInMessages.values().iterator();
i
                     .hasNext();) {
                 QueueMessageReference node = (QueueMessageReference) i.next();
                 if (node.isDropped()
@@ -263,7 +235,6 @@
     public void removeSubscription(ConnectionContext context, Subscription sub)
             throws Exception {
         destinationStatistics.getConsumers().decrement();
-        maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize();
         // synchronize with dispatch method so that no new messages are sent
         // while
         // removing up a subscription.
@@ -309,7 +280,7 @@
             // lets copy the messages to dispatch to avoid deadlock
             List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>();
             synchronized (pagedInMessages) {
-                for (Iterator<MessageReference> i = pagedInMessages.iterator(); i
+                for (Iterator<MessageReference> i = pagedInMessages.values().iterator();
i
                         .hasNext();) {
                     QueueMessageReference node = (QueueMessageReference) i
                             .next();
@@ -493,40 +464,9 @@
         destinationStatistics.setParent(null);
     }
 
-    public void dropEvent() {
-        dropEvent(false);
-    }
-
-    public void dropEvent(boolean skipGc) {
-        // TODO: need to also decrement when messages expire.
-        destinationStatistics.getMessages().decrement();
-        synchronized (pagedInMessages) {
-            garbageSize++;
-        }
-        if (!skipGc && garbageSize > garbageSizeBeforeCollection) {
-            gc();
-        }
-        try {
-            taskRunner.wakeup();
-        } catch (InterruptedException e) {
-            log.warn("Task Runner failed to wakeup ", e);
-        }
-    }
-
-    public void gc() {
-        synchronized (pagedInMessages) {
-            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();)
{
-                // Remove dropped messages from the queue.
-                QueueMessageReference node = (QueueMessageReference)i.next();
-                if (node.isDropped()) {
-                    garbageSize--;
-                    i.remove();
-                    continue;
-                }
-            }
-        }
-    }
-
+	public void gc(){
+	}
+    
     public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack,
MessageReference node) throws IOException {
         if (store != null && node.isPersistent()) {
             // the original ack may be a ranged ack, but we are trying to delete
@@ -589,18 +529,7 @@
         return destination;
     }
 
-    public String getDestination() {
-        return destination.getPhysicalName();
-    }
-
-    public MemoryUsage getBrokerMemoryUsage() {
-        return memoryUsage;
-    }
-
-    public DestinationStatistics getDestinationStatistics() {
-        return destinationStatistics;
-    }
-
+    
     public MessageGroupMap getMessageGroupOwners() {
         if (messageGroupOwners == null) {
             messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
@@ -632,10 +561,6 @@
         this.messageGroupMapFactory = messageGroupMapFactory;
     }
 
-    public String getName() {
-        return getActiveMQDestination().getPhysicalName();
-    }
-
     public PendingMessageCursor getMessages() {
         return this.messages;
     }
@@ -652,10 +577,6 @@
         return result;
     }
 
-    public MessageStore getMessageStore() {
-        return store;
-    }
-
     public Message[] browse() {
         List<Message> l = new ArrayList<Message>();
         try {
@@ -664,7 +585,7 @@
             log.error("caught an exception browsing " + this, e);
         }
         synchronized (pagedInMessages) {
-            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();)
{
+            for (Iterator<MessageReference> i = pagedInMessages.values().iterator();
i.hasNext();) {
                 MessageReference r = i.next();
                 r.incrementReferenceCount();
                 try {
@@ -736,15 +657,18 @@
         return null;
     }
 
-    public void purge() throws Exception {
-
-        pageInMessages();
+    public void purge() throws Exception {   
+        ConnectionContext c = createConnectionContext();
+        List<MessageReference> list = null;
+        do {
+            pageInMessages();
+            synchronized (pagedInMessages) {
+                list = new ArrayList<MessageReference>(pagedInMessages.values());
+            }
 
-        synchronized (pagedInMessages) {
-            ConnectionContext c = createConnectionContext();
-            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();)
{
+            for (MessageReference ref : list) {
                 try {
-                    QueueMessageReference r = (QueueMessageReference)i.next();
+                    QueueMessageReference r = (QueueMessageReference) ref;
 
                     // We should only delete messages that can be locked.
                     if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
@@ -752,18 +676,13 @@
                         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
                         ack.setDestination(destination);
                         ack.setMessageID(r.getMessageId());
-                        acknowledge(c, null, ack, r);
-                        r.drop();
-                        dropEvent(true);
+                        removeMessage(c, null, r, ack);
                     }
                 } catch (IOException e) {
                 }
             }
-
-            // Run gc() by hand. Had we run it in the loop it could be
-            // quite expensive.
-            gc();
-        }
+        } while (!pagedInMessages.isEmpty() || this.destinationStatistics.getMessages().getCount()
> 0);
+        gc();
     }
 
     /**
@@ -799,22 +718,29 @@
      * @return the number of messages removed
      */
     public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages)
throws Exception {
-        pageInMessages();
-        int counter = 0;
-        synchronized (pagedInMessages) {
-            ConnectionContext c = createConnectionContext();
-            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();)
{
-                IndirectMessageReference r = (IndirectMessageReference)i.next();
-                if (filter.evaluate(c, r)) {
-                    removeMessage(c, r);
-                    if (++counter >= maximumMessages && maximumMessages > 0)
{
-                        break;
-                    }
+        int movedCounter = 0;
+        int count = 0;
+        ConnectionContext context = createConnectionContext();
+        List<MessageReference> list = null;
+        do {
+            pageInMessages();
+            synchronized (pagedInMessages) {
+                list = new ArrayList<MessageReference>(pagedInMessages.values());
+            }
+            for (MessageReference ref : list) {
+                IndirectMessageReference r = (IndirectMessageReference) ref;
+                if (filter.evaluate(context, r)) {
 
+                    removeMessage(context, r);
+                    if (++movedCounter >= maximumMessages
+                            && maximumMessages > 0) {
+                        return movedCounter;
+                    }
                 }
+                count++;
             }
-        }
-        return counter;
+        } while (count < this.destinationStatistics.getMessages().getCount());
+        return movedCounter;
     }
 
     /**
@@ -850,26 +776,36 @@
      * @return the number of messages copied
      */
     public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception {
-        pageInMessages();
-        int counter = 0;
-        synchronized (pagedInMessages) {
-            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();)
{
-                MessageReference r = i.next();
+        int movedCounter = 0;
+        int count = 0;
+        List<MessageReference> list = null;
+        do {
+            pageInMessages();
+            synchronized (pagedInMessages) {
+                list = new ArrayList<MessageReference>(pagedInMessages.values());
+            }
+            for (MessageReference ref : list) {
+                IndirectMessageReference r = (IndirectMessageReference) ref;
                 if (filter.evaluate(context, r)) {
-                    r.incrementReferenceCount();
-                    try {
-                        Message m = r.getMessage();
-                        BrokerSupport.resend(context, m, dest);
-                        if (++counter >= maximumMessages && maximumMessages >
0) {
-                            break;
+                    // We should only copy messages that can be locked.
+                    if (lockMessage(r)) {
+                        r.incrementReferenceCount();
+                        try {
+                            Message m = r.getMessage();
+                            BrokerSupport.resend(context, m, dest);
+                            if (++movedCounter >= maximumMessages
+                                    && maximumMessages > 0) {
+                                return movedCounter;
+                            }
+                        } finally {
+                            r.decrementReferenceCount();
                         }
-                    } finally {
-                        r.decrementReferenceCount();
                     }
                 }
+                count++;
             }
-        }
-        return counter;
+        } while (count < this.destinationStatistics.getMessages().getCount());
+        return movedCounter;
     }
 
     /**
@@ -900,12 +836,17 @@
      * Moves the messages matching the given filter up to the maximum number of
      * matched messages
      */
-    public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception {
-        pageInMessages();
-        int counter = 0;
-        synchronized (pagedInMessages) {
-            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();)
{
-                IndirectMessageReference r = (IndirectMessageReference)i.next();
+    public int moveMatchingMessagesTo(ConnectionContext context,MessageReferenceFilter filter,
ActiveMQDestination dest,int maximumMessages) throws Exception {
+        int movedCounter = 0;
+        int count = 0;
+        List<MessageReference> list = null;
+        do {
+            pageInMessages();
+            synchronized (pagedInMessages) {
+                list = new ArrayList<MessageReference>(pagedInMessages.values());
+            }
+            for (MessageReference ref : list) {
+                IndirectMessageReference r = (IndirectMessageReference) ref;
                 if (filter.evaluate(context, r)) {
                     // We should only move messages that can be locked.
                     if (lockMessage(r)) {
@@ -914,17 +855,19 @@
                             Message m = r.getMessage();
                             BrokerSupport.resend(context, m, dest);
                             removeMessage(context, r);
-                            if (++counter >= maximumMessages && maximumMessages
> 0) {
-                                break;
+                            if (++movedCounter >= maximumMessages
+                                    && maximumMessages > 0) {
+                                return movedCounter;
                             }
                         } finally {
                             r.decrementReferenceCount();
                         }
                     }
                 }
+                count++;
             }
-        }
-        return counter;
+        } while (count < this.destinationStatistics.getMessages().getCount());
+        return movedCounter;
     }
 
     /**
@@ -937,7 +880,6 @@
             Runnable op = messagesWaitingForSpace.removeFirst();
             op.run();
         }
-
         try {
             pageInMessages(false);
         } catch (Exception e) {
@@ -976,9 +918,21 @@
         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
         ack.setDestination(destination);
         ack.setMessageID(r.getMessageId());
-        acknowledge(c, null, ack, r);
-        r.drop();
-        dropEvent();
+        removeMessage(c, null, r, ack);
+    }
+    
+    protected void removeMessage(ConnectionContext context,Subscription sub,QueueMessageReference
reference,MessageAck ack) throws IOException {
+        reference.drop();
+        acknowledge(context, sub, ack, reference);
+        destinationStatistics.getMessages().decrement();
+        synchronized(pagedInMessages) {
+            pagedInMessages.remove(reference.getMessageId());
+        }
+        try {
+            taskRunner.wakeup();
+        } catch (InterruptedException e) {
+            log.warn("Task Runner failed to wakeup ", e);
+        }
     }
 
     protected boolean lockMessage(IndirectMessageReference r) {
@@ -1008,7 +962,7 @@
     }
 
     private List<MessageReference> buildList(boolean force) throws Exception {
-        final int toPageIn = maximumPagedInMessages - pagedInMessages.size();
+        final int toPageIn = MAXIMUM_PAGE_SIZE - pagedInMessages.size();
         List<MessageReference> result = null;
         if ((force || !consumers.isEmpty()) && toPageIn > 0) {
             messages.setMaxBatchSize(toPageIn);
@@ -1036,7 +990,9 @@
                 }
             }
             synchronized (pagedInMessages) {
-                pagedInMessages.addAll(result);
+                for(MessageReference ref:result) {
+                    pagedInMessages.put(ref.getMessageId(), ref);
+                }
             }
         }
         return result;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=613230&r1=613229&r2=613230&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Fri Jan 18 11:16:15 2008
@@ -46,22 +46,18 @@
      * 
      * @throws IOException
      */
-    protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference
n) throws IOException {
+    protected void acknowledge(final ConnectionContext context, final MessageAck ack, final
MessageReference n) throws IOException {
 
         final Destination q = n.getRegionDestination();
-        q.acknowledge(context, this, ack, n);
-
         final QueueMessageReference node = (QueueMessageReference)n;
         final Queue queue = (Queue)q;
         if (!ack.isInTransaction()) {
-            node.drop();
-            queue.dropEvent();
+            queue.removeMessage(context, this, node, ack);
         } else {
             node.setAcked(true);
             context.getTransaction().addSynchronization(new Synchronization() {
                 public void afterCommit() throws Exception {
-                    node.drop();
-                    queue.dropEvent();
+                    queue.removeMessage(context, QueueSubscription.this, node, ack);
                 }
 
                 public void afterRollback() throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=613230&r1=613229&r2=613230&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Fri Jan 18 11:16:15 2008
@@ -326,6 +326,7 @@
             throws Exception {
         ActiveMQDestination destination = info.getDestination();
         if (destination != null) {
+            addDestination(context, destination);
             switch (destination.getDestinationType()) {
             case ActiveMQDestination.QUEUE_TYPE:
                 queueRegion.addProducer(context, info);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=613230&r1=613229&r2=613230&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Jan 18 11:16:15 2008
@@ -18,7 +18,6 @@
 
 import java.io.IOException;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -48,14 +47,12 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
-import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
@@ -69,14 +66,9 @@
  */
 public class Topic  extends BaseDestination  implements Task{
     private static final Log LOG = LogFactory.getLog(Topic.class);
-    protected final ActiveMQDestination destination;
+    private final TopicMessageStore topicStore;
     protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
-    protected final Valve dispatchValve = new Valve(true);
-    // this could be NULL! (If an advisory)
-    protected final TopicMessageStore store;
-    private final SystemUsage systemUsage;
-    private final MemoryUsage memoryUsage;
-   
+    protected final Valve dispatchValve = new Valve(true);   
     private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private boolean sendAdvisoryIfNoConsumers;
@@ -92,16 +84,12 @@
                 }
         };
     };
-    private final Broker broker;
+   
 
     public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store,
SystemUsage systemUsage, DestinationStatistics parentStats,
                  TaskRunnerFactory taskFactory) throws Exception {
-        this.broker = broker;
-        this.destination = destination;
-        this.store = store; // this could be NULL! (If an advisory)
-        this.systemUsage=systemUsage;
-        this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
-        this.memoryUsage.setUsagePortion(1.0f);
+        super(broker, store, destination,systemUsage, parentStats);
+        this.topicStore=store;
         //set default subscription recovery policy
         if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination) ){
         	 subscriptionRecoveryPolicy= new NoSubscriptionRecoveryPolicy();
@@ -110,16 +98,6 @@
         	subscriptionRecoveryPolicy= new FixedSizedSubscriptionRecoveryPolicy();
         } 
         this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
-        // Let the store know what usage manager we are using so that he can
-        // flush messages to disk
-        // when usage gets high.
-        if (store != null) {
-            store.setMemoryUsage(memoryUsage);
-        }
-
-        // let's copy the enabled property from the parent DestinationStatistics
-        this.destinationStatistics.setEnabled(parentStats.isEnabled());
-        this.destinationStatistics.setParent(parentStats);
     }
 
     public boolean lock(MessageReference node, LockOwner sub) {
@@ -174,8 +152,8 @@
     }
 
     public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws
IOException {
-        if (store != null) {
-            store.deleteSubscription(key.clientId, key.subscriptionName);
+        if (topicStore != null) {
+            topicStore.deleteSubscription(key.clientId, key.subscriptionName);
             Object removed = durableSubcribers.remove(key);
             if (removed != null) {
                 destinationStatistics.getConsumers().decrement();
@@ -194,7 +172,7 @@
                 consumers.add(subscription);
             }
 
-            if (store == null) {
+            if (topicStore == null) {
                 return;
             }
 
@@ -202,13 +180,13 @@
             String clientId = subscription.getClientId();
             String subscriptionName = subscription.getSubscriptionName();
             String selector = subscription.getConsumerInfo().getSelector();
-            SubscriptionInfo info = store.lookupSubscription(clientId, subscriptionName);
+            SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
             if (info != null) {
                 // Check to see if selector changed.
                 String s1 = info.getSelector();
                 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector)))
{
                     // Need to delete the subscription
-                    store.deleteSubscription(clientId, subscriptionName);
+                    topicStore.deleteSubscription(clientId, subscriptionName);
                     info = null;
                 }
             }
@@ -222,13 +200,13 @@
                 // Thi destination is an actual destination id.
                 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());

                 // This destination might be a pattern
-                store.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
+                topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
             }
 
             final MessageEvaluationContext msgContext = new MessageEvaluationContext();
             msgContext.setDestination(destination);
             if (subscription.isRecoveryRequired()) {
-                store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener()
{
+                topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener()
{
                     public boolean recoverMessage(Message message) throws Exception {
                         message.setRegionDestination(Topic.this);
                         try {
@@ -395,14 +373,14 @@
                 .getConnectionContext();
         message.setRegionDestination(this);
 
-        if (store != null && message.isPersistent()
+        if (topicStore != null && message.isPersistent()
                 && !canOptimizeOutPersistence()) {
             while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
                 if (context.getStopping().get()) {
                     throw new IOException("Connection closed, send aborted.");
                 }
             }
-            store.addMessage(context, message);
+            topicStore.addMessage(context, message);
         }
 
         message.incrementReferenceCount();
@@ -446,15 +424,15 @@
     }
 
     public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck
ack, final MessageReference node) throws IOException {
-        if (store != null && node.isPersistent()) {
+        if (topicStore != null && node.isPersistent()) {
             DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
-            store.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), node.getMessageId());
+            topicStore.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(),
node.getMessageId());
         }
     }
 
     public void dispose(ConnectionContext context) throws IOException {
-        if (store != null) {
-            store.removeAllMessages(context);
+        if (topicStore != null) {
+            topicStore.removeAllMessages(context);
         }
         destinationStatistics.setParent(null);
     }
@@ -463,7 +441,7 @@
     }
 
     public Message loadMessage(MessageId messageId) throws IOException {
-        return store != null ? store.getMessage(messageId) : null;
+        return topicStore != null ? topicStore.getMessage(messageId) : null;
     }
 
     public void start() throws Exception {
@@ -487,8 +465,8 @@
     public Message[] browse() {
         final Set<Message> result = new CopyOnWriteArraySet<Message>();
         try {
-            if (store != null) {
-                store.recover(new MessageRecoveryListener() {
+            if (topicStore != null) {
+                topicStore.recover(new MessageRecoveryListener() {
                     public boolean recoverMessage(Message message) throws Exception {
                         result.add(message);
                         return true;
@@ -527,21 +505,7 @@
     // Properties
     // -------------------------------------------------------------------------
 
-    public MemoryUsage getBrokerMemoryUsage() {
-        return memoryUsage;
-    }
-
-    public DestinationStatistics getDestinationStatistics() {
-        return destinationStatistics;
-    }
-
-    public ActiveMQDestination getActiveMQDestination() {
-        return destination;
-    }
-
-    public String getDestination() {
-        return destination.getPhysicalName();
-    }
+    
 
     public DispatchPolicy getDispatchPolicy() {
         return dispatchPolicy;
@@ -567,10 +531,6 @@
         this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
     }
 
-    public MessageStore getMessageStore() {
-        return store;
-    }
-
     public DeadLetterStrategy getDeadLetterStrategy() {
         return deadLetterStrategy;
     }
@@ -579,10 +539,7 @@
         this.deadLetterStrategy = deadLetterStrategy;
     }
 
-    public String getName() {
-        return getActiveMQDestination().getPhysicalName();
-    }
-
+    
     // Implementation methods
     // -------------------------------------------------------------------------
     protected void dispatch(final ConnectionContext context, Message message) throws Exception
{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java?rev=613230&r1=613229&r2=613230&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java
Fri Jan 18 11:16:15 2008
@@ -31,22 +31,24 @@
  */
 public class SimpleDispatchPolicy implements DispatchPolicy {
 
-    public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List
consumers) throws Exception {
+    public boolean dispatch(MessageReference node,MessageEvaluationContext msgContext, List<Subscription>
consumers)
+            throws Exception {
+
         int count = 0;
-        for (Iterator iter = consumers.iterator(); iter.hasNext();) {
-            Subscription sub = (Subscription)iter.next();
+        synchronized (consumers) {
+            for (Subscription sub:consumers) {
+                // Don't deliver to browsers
+                if (sub.getConsumerInfo().isBrowser()) {
+                    continue;
+                }
+                // Only dispatch to interested subscriptions
+                if (!sub.matches(node, msgContext)) {
+                    continue;
+                }
 
-            // Don't deliver to browsers
-            if (sub.getConsumerInfo().isBrowser()) {
-                continue;
-            }
-            // Only dispatch to interested subscriptions
-            if (!sub.matches(node, msgContext)) {
-                continue;
+                sub.add(node);
+                count++;
             }
-
-            sub.add(node);
-            count++;
         }
         return count > 0;
     }



Mime
View raw message