activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r618621 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ cursors/
Date Tue, 05 Feb 2008 12:14:56 GMT
Author: rajdavies
Date: Tue Feb  5 04:14:49 2008
New Revision: 618621

URL: http://svn.apache.org/viewvc?rev=618621&view=rev
Log:
refactored cursors to improve re-use

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Tue Feb  5 04:14:49 2008
@@ -83,7 +83,5 @@
     
     void setEnableAudit(boolean enableAudit);
     
-    boolean isActive();
-    
-    
+    boolean isActive();   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Tue Feb  5 04:14:49 2008
@@ -178,7 +178,8 @@
         }
         prefetchExtension = 0;
     }
-
+    
+    
     protected MessageDispatch createMessageDispatch(MessageReference node, Message message)
{
         MessageDispatch md = super.createMessageDispatch(node, message);
         Integer count = redeliveredMessages.get(node.getMessageId());
@@ -192,7 +193,6 @@
         if (!active && !keepDurableSubsActive) {
             return;
         }
-        node.incrementReferenceCount();
         super.add(node);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Feb  5 04:14:49 2008
@@ -59,13 +59,12 @@
     protected long enqueueCounter;
     protected long dispatchCounter;
     protected long dequeueCounter;
-    protected boolean optimizedDispatch=false;
     private int maxProducersToAudit=32;
     private int maxAuditDepth=2048;
     protected final SystemUsage usageManager;
-    protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
     private final Object pendingLock = new Object();
     private final Object dispatchLock = new Object();
+    protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
 
     public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext
context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
         super(broker, context, info);
@@ -127,29 +126,13 @@
     }
 
     public void add(MessageReference node) throws Exception {
-		boolean pendingEmpty = false;
-		boolean dispatchPending = false;
-		synchronized (pendingLock) {
-			pendingEmpty = pending.isEmpty();
-			enqueueCounter++;
-			if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave())
{
-				pending.dispatched(node);
-				dispatch(node);
-			} else {
-				optimizePrefetch();
-				synchronized (pendingLock) {
-					if (pending.isEmpty() && LOG.isDebugEnabled()) {
-						LOG.debug("Prefetch limit.");
-					}
-					pending.addMessageLast(node);
-					dispatchPending = true;
-				}
-			}
-		}
-		if (dispatchPending) {
-			dispatchPending();
-		}
-	}
+        synchronized (pendingLock) {
+            enqueueCounter++;
+            pending.addMessageLast(node);
+            dispatchPending();
+        }
+       
+    }
 
     public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws
Exception {
         synchronized(pendingLock) {
@@ -441,24 +424,7 @@
         }
     }
 
-    /**
-     * optimize message consumer prefetch if the consumer supports it
-     */
-    public void optimizePrefetch() {
-        /*
-         * if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
-         * &&context.getConnection().isManageable()){
-         * if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() &&
-         * isLowWaterMark()){
-         * info.setCurrentPrefetchSize(info.getPrefetchSize());
-         * updateConsumerPrefetch(info.getPrefetchSize()); }else
-         * if(info.getCurrentPrefetchSize()==info.getPrefetchSize() &&
-         * isHighWaterMark()){ // want to purge any outstanding acks held by the
-         * consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1); } }
-         */
-    }
-
-    public void add(ConnectionContext context, Destination destination) throws Exception
{
+   public void add(ConnectionContext context, Destination destination) throws Exception {
         synchronized(pendingLock) {
             super.add(context, destination);
             pending.add(context, destination);
@@ -490,13 +456,13 @@
                             if (canDispatch(node)) {
                                 pending.remove();
                                 // Message may have been sitting in the pending
-                                // list
-                                // a while
-                                // waiting for the consumer to ak the message.
+                                // list a while waiting for the consumer to ak the message.
                                 if (node != QueueMessageReference.NULL_MESSAGE
-                                        && broker.isExpired(node)) {
+                                        && node.isExpired()) {
                                     broker.messageExpired(getContext(), node);
                                     dequeueCounter++;
+                                    //increment number to dispatch
+                                    numberToDispatch++;
                                     continue;
                                 }
                                 dispatch(node);
@@ -518,6 +484,7 @@
         }         
         // Make sure we can dispatch a message.
         if (canDispatch(node) && !isSlave()) {
+            
             MessageDispatch md = createMessageDispatch(node, message);
             // NULL messages don't count... they don't get Acked.
             if (node != QueueMessageReference.NULL_MESSAGE) {
@@ -617,14 +584,7 @@
     protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference
node) throws IOException {
     }
 
-    public boolean isOptimizedDispatch() {
-        return optimizedDispatch;
-    }
-
-    public void setOptimizedDispatch(boolean optimizedDispatch) {
-        this.optimizedDispatch = optimizedDispatch;
-    }
-
+    
     public int getMaxProducersToAudit() {
         return maxProducersToAudit;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Feb  5 04:14:49 2008
@@ -82,15 +82,14 @@
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
     private final Object exclusiveLockMutex = new Object();
     private final Object sendLock = new Object();
-    private final TaskRunner taskRunner;
-    
+    private final TaskRunner taskRunner;    
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
             wakeup();
-        };
+        }
     };
-    
+           
     public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore
store,DestinationStatistics parentStats,
                  TaskRunnerFactory taskFactory) throws Exception {
         super(broker, store, destination,systemUsage, parentStats);
@@ -869,22 +868,31 @@
     }
 
     /**
-     * @return
+     * @return true if we would like to iterate again
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate() {
-
-        try {
-            pageInMessages(false);
-        } catch (Exception e) {
-            log.error("Failed to page in more queue messages ", e);
+        boolean result = false;
+        synchronized (messages) {
+            result = !messages.isEmpty();
+        }
+        if (result) {
+            try {
+                pageInMessages(false);
+               
+            } catch (Throwable e) {
+                log.error("Failed to page in more queue messages ", e);
+            }
         }
-        while (!messagesWaitingForSpace.isEmpty() &&!memoryUsage.isFull()) {
+        while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
             Runnable op = messagesWaitingForSpace.removeFirst();
             op.run();
         }
-        
-        return false;
+
+        synchronized (messages) {
+            result = !messages.isEmpty();
+        }
+        return result;
     }
 
     protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
@@ -949,9 +957,10 @@
         wakeup();
     }
     
-    final void wakeup() {
+    final synchronized void wakeup() {
         try {
             taskRunner.wakeup();
+            
         } catch (InterruptedException e) {
             log.warn("Task Runner failed to wakeup ", e);
         }
@@ -972,7 +981,6 @@
             int count = 0;
             result = new ArrayList<MessageReference>(toPageIn);
             synchronized (messages) {
-
                 try {
                     messages.reset();
                     while (messages.hasNext() && count < toPageIn) {
@@ -1001,16 +1009,16 @@
         return result;
     }
 
-    private synchronized void doDispatch(List<MessageReference> list) throws Exception
{
+    private void doDispatch(List<MessageReference> list) throws Exception {
+       
         if (list != null && !list.isEmpty()) {
             MessageEvaluationContext msgContext = new MessageEvaluationContext();
             for (int i = 0; i < list.size(); i++) {
-                MessageReference node = list.get(i);
+                MessageReference node = list.get(i);         
                 msgContext.setDestination(destination);
                 msgContext.setMessageReference(node);
                 dispatchPolicy.dispatch(node, msgContext, consumers);
             }
-
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Tue Feb  5 04:14:49 2008
@@ -47,7 +47,6 @@
      * @throws IOException
      */
     protected void acknowledge(final ConnectionContext context, final MessageAck ack, final
MessageReference n) throws IOException {
-
         final Destination q = n.getRegionDestination();
         final QueueMessageReference node = (QueueMessageReference)n;
         final Queue queue = (Queue)q;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Tue Feb  5 04:14:49 2008
@@ -174,12 +174,7 @@
      */
     void updateConsumerPrefetch(int newPrefetch);
     
-    /**
-     * optimize message consumer prefetch if the consumer supports it
-     *
-     */
-    void optimizePrefetch();
-    
+        
     /**
      * Called when the subscription is destroyed.
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Tue Feb  5 04:14:49 2008
@@ -39,6 +39,7 @@
     protected int maxAuditDepth=1;
     protected boolean enableAudit=true;
     protected ActiveMQMessageAudit audit;
+    protected boolean useCache=true;
     private boolean started=false;
   
 
@@ -264,6 +265,14 @@
      */
     public ActiveMQMessageAudit getMessageAudit() {
     	return audit;
+    }
+    
+    public boolean isUseCache() {
+        return useCache;
+    }
+
+    public void setUseCache(boolean useCache) {
+        this.useCache = useCache;
     }
 
     protected synchronized boolean  isDuplicate(MessageId messageId) {

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=618621&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Tue Feb  5 04:14:49 2008
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ *  Store based cursor
+ *
+ */
+public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements
MessageRecoveryListener, UsageListener {
+    private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
+    protected final Destination regionDestination;
+    protected final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message>
();
+    protected boolean cacheEnabled=false;
+    protected boolean batchResetNeeded = true;
+    protected boolean storeHasMessages = false;
+    protected int size;
+    
+    protected AbstractStoreCursor(Destination destination) {
+        this.regionDestination=destination;
+    }
+    
+    public final synchronized void start() throws Exception{
+        if (!isStarted()) {
+            this.size = getStoreSize();
+            this.storeHasMessages=this.size > 0;
+            if (!this.storeHasMessages&&useCache) {
+                cacheEnabled=true;
+            }
+        }
+        super.start();
+        clear();
+        resetBatch();
+        getSystemUsage().getMemoryUsage().addUsageListener(this);
+    }
+    
+    public final synchronized void stop() throws Exception {
+        getSystemUsage().getMemoryUsage().removeUsageListener(this);
+        resetBatch();
+        gc();
+        super.stop();
+    }
+
+    
+    public final boolean recoverMessage(Message message) throws Exception {
+        return recoverMessage(message,false);
+    }
+    
+    public synchronized boolean recoverMessage(Message message, boolean cached)throws Exception
{
+        if (!isDuplicate(message.getMessageId())) {
+            if (!cached) {
+                message.setRegionDestination(regionDestination);
+                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+            }
+            message.incrementReferenceCount();
+            batchList.put(message.getMessageId(), message);
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Ignoring batched duplicated from store: " + message);
+            }
+            storeHasMessages = true;
+        }
+        return true;
+    }
+    
+    public final void reset() {
+    }
+
+    public final void finished() {
+    }
+        
+    public final synchronized boolean hasNext() {
+        if (batchList.isEmpty()) {
+            try {
+                fillBatch();
+            } catch (Exception e) {
+                LOG.error("Failed to fill batch", e);
+                throw new RuntimeException(e);
+            }
+        }
+        boolean result= !batchList.isEmpty();
+        return result;
+    }
+    
+    public final synchronized MessageReference next() {
+        Message result = null;
+        if (!this.batchList.isEmpty()) {
+            Iterator<Entry<MessageId, Message>> i = this.batchList.entrySet().iterator();
+            result = i.next().getValue();
+            i.remove();
+        }
+        return result;
+    }
+    
+    public final synchronized void addMessageLast(MessageReference node) throws Exception
{
+        if (cacheEnabled && hasSpace()) {
+            recoverMessage(node.getMessage(),true);
+        }else {
+            cacheEnabled=false;
+        }
+        size++;
+    }
+
+    public final synchronized void addMessageFirst(MessageReference node) throws Exception
{
+        cacheEnabled=false;
+        size++;
+    }
+
+    public final synchronized void remove() {
+        size--;
+        if (size==0 && isStarted() && cacheEnabled) {
+            cacheEnabled=true;
+        }
+    }
+
+    public final synchronized void remove(MessageReference node) {
+        size--;
+        cacheEnabled=false;
+    }
+    
+           
+    public final synchronized void onUsageChanged(Usage usage, int oldPercentUsage,
+            int newPercentUsage) {
+        if (oldPercentUsage > newPercentUsage && oldPercentUsage >= memoryUsageHighWaterMark)
{
+            storeHasMessages = true;
+            try {
+                fillBatch();
+            } catch (Exception e) {
+                LOG.error("Failed to fill batch ", e);
+            }
+        }
+        
+    }
+    
+    public final synchronized void clear() {
+        gc();
+    }
+    
+    public final synchronized void gc() {
+        for (Message msg : batchList.values()) {
+            rollback(msg.getMessageId());
+            msg.decrementReferenceCount();
+        }
+        batchList.clear();
+        batchResetNeeded = true;
+        this.cacheEnabled=false;
+    }
+    
+    protected final synchronized void fillBatch() {
+        if (batchResetNeeded) {
+            resetBatch();
+            this.batchResetNeeded = false;
+        }
+        while (this.batchList.isEmpty() && (this.storeHasMessages || size > 0))
{
+            this.storeHasMessages = false;
+            try {
+                doFillBatch();
+            } catch (Exception e) {
+                LOG.error("Failed to fill batch", e);
+                throw new RuntimeException(e);
+            }
+            if (!this.batchList.isEmpty()) {
+                this.storeHasMessages=true;
+            }
+        }
+    }
+    
+    public final synchronized boolean isEmpty() {
+        return size <= 0;
+    }
+
+    public final synchronized boolean hasMessagesBufferedToDeliver() {
+        return !batchList.isEmpty();
+    }
+
+    public final synchronized int size() {
+        if (isStarted()) {
+            return size;
+        }
+        this.size = getStoreSize();
+        return size;
+        
+    }
+    
+    
+    protected abstract void doFillBatch() throws Exception;
+    
+    protected abstract void resetBatch();
+    
+    protected abstract int getStoreSize();
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Tue Feb  5 04:14:49 2008
@@ -65,6 +65,8 @@
      */
     public FilePendingMessageCursor(Broker broker,String name) {
         this.broker = broker;
+        //the store can be null if the BrokerService has persistence 
+        //turned off
         this.store= broker.getTempDataStore();
         this.name = NAME_COUNT.incrementAndGet() + "_" + name;
     }
@@ -167,7 +169,7 @@
             try {
                 regionDestination = node.getMessage().getRegionDestination();
                 if (isDiskListEmpty()) {
-                    if (hasSpace()) {
+                    if (hasSpace() || this.store==null) {
                         memoryList.add(node);
                         node.incrementReferenceCount();
                         return;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Tue Feb  5 04:14:49 2008
@@ -267,6 +267,17 @@
      * @return the audit - could be null
      */
     public ActiveMQMessageAudit getMessageAudit();
+    
+    /**
+     * use a cache to improve performance
+     * @param useCache
+     */
+    public void setUseCache(boolean useCache);
+    
+    /**
+     * @return true if a cache is being used
+     */
+    public boolean isUseCache();
 
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Tue Feb  5 04:14:49 2008
@@ -17,13 +17,10 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
-import java.util.LinkedList;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.MessageReference;
+
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,136 +31,22 @@
  * 
  * @version $Revision: 474985 $
  */
-class QueueStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener
{
-
+class QueueStorePrefetch extends AbstractStoreCursor {
     private static final Log LOG = LogFactory.getLog(QueueStorePrefetch.class);
-
     private MessageStore store;
-    private final LinkedList<Message> batchList = new LinkedList<Message>();
-    private Destination regionDestination;
-    private int size;
-    private boolean fillBatchDuplicates;
-    private boolean cacheEnabled;
-    private boolean useCache =false;
-
+   
     /**
      * Construct it
      * @param queue
      */
     public QueueStorePrefetch(Queue queue) {
-        this.regionDestination = queue;
+        super(queue);
         this.store = (MessageStore)queue.getMessageStore();
 
     }
 
-    public synchronized void start() throws Exception{
-        if (!isStarted()) {
-            this.size = getStoreSize();
-            if (this.size==0&&useCache) {
-                cacheEnabled=true;
-            }
-        }
-        super.start();
-        store.resetBatching();
-    }
-
-    public void stop() throws Exception {
-        store.resetBatching();
-        super.stop();
-    }
-
-    /**
-     * @return true if there are no pending messages
-     */
-    public boolean isEmpty() {
-        return size <= 0;
-    }
-
-    public boolean hasMessagesBufferedToDeliver() {
-        return !batchList.isEmpty();
-    }
-
-    public synchronized int size() {
-        if (isStarted()) {
-            return size;
-        }
-        this.size = getStoreSize();
-        return size;
-        
-    }
-
-    public synchronized void addMessageLast(MessageReference node) throws Exception {
-        if (cacheEnabled && hasSpace()) {
-            //optimization - A persistent queue will add the message to
-            //to store then retrieve it again from the store.
-            recoverMessage(node.getMessage());
-        }else {
-            cacheEnabled=false;
-        }
-        size++;
-    }
-
-    public void addMessageFirst(MessageReference node) throws Exception {
-        size++;
-    }
-
-    public synchronized void remove() {
-        size--;
-        if (size==0 && isStarted() && cacheEnabled) {
-            cacheEnabled=true;
-        }
-    }
-
-    public void remove(MessageReference node) {
-        size--;
-        cacheEnabled=false;
-    }
-
-    public synchronized boolean hasNext() {
-        if (batchList.isEmpty()) {
-            try {
-                fillBatch();
-            } catch (Exception e) {
-                LOG.error("Failed to fill batch", e);
-                throw new RuntimeException(e);
-            }
-        }
-        return !batchList.isEmpty();
-    }
-
-    public synchronized MessageReference next() {
-        Message result = batchList.removeFirst();
-        result.decrementReferenceCount();
-        result.setRegionDestination(regionDestination);
-        result.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
-        return result;
-    }
-
-    public void reset() {
-    }
-
-    // MessageRecoveryListener implementation
-    public void finished() {
-    }
-
-    public synchronized boolean recoverMessage(Message message)
-            throws Exception {
-        if (!isDuplicate(message.getMessageId())) {
-            message.setRegionDestination(regionDestination);
-            message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
-            message.incrementReferenceCount();
-            batchList.addLast(message);
-        } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Ignoring batched duplicated from store: " + message);
-            }
-            fillBatchDuplicates=true;
-        }
-        return true;
-    }
-
     public boolean recoverMessageReference(MessageId messageReference) throws Exception {
-        Message msg = store.getMessage(messageReference);
+        Message msg = this.store.getMessage(messageReference);
         if (msg != null) {
             return recoverMessage(msg);
         } else {
@@ -173,32 +56,23 @@
         }
     }
 
-    public synchronized void gc() {
-        for (Message msg : batchList) {
-            rollback(msg.getMessageId());
-            msg.decrementReferenceCount();
-        }
-        cacheEnabled=false;
-        batchList.clear();
-    }
-
-    // implementation
-    protected synchronized void fillBatch() throws Exception {
-        store.recoverNextMessages(maxBatchSize, this);
-        while (fillBatchDuplicates && batchList.isEmpty()) {
-            fillBatchDuplicates=false;
-            store.recoverNextMessages(maxBatchSize, this);
-        }
-        fillBatchDuplicates=false;
-    }
-    
+   
+        
     protected synchronized int getStoreSize() {
         try {
-            return store.getMessageCount();
+            return this.store.getMessageCount();
         } catch (IOException e) {
             LOG.error("Failed to get message count", e);
             throw new RuntimeException(e);
         }
+    }
+    
+    protected void resetBatch() {
+        this.store.resetBatching();
+    }
+    
+    protected void doFillBatch() throws Exception {
+        this.store.recoverNextMessages(this.maxBatchSize, this);
     }
 
     public String toString() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Tue Feb  5 04:14:49 2008
@@ -50,8 +50,7 @@
     private boolean started;
     private PendingMessageCursor nonPersistent;
     private PendingMessageCursor currentCursor;
-    private final Subscription subscription;
-
+    private Subscription subscription;
     /**
      * @param broker 
      * @param topic
@@ -62,11 +61,11 @@
      * @throws IOException
      */
     public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int
maxBatchSize, Subscription subscription) {
+        this.subscription=subscription;
         this.clientId = clientId;
         this.subscriberName = subscriberName;
-        this.subscription = subscription;
         this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName);
-        storePrefetches.add(nonPersistent);
+        this.storePrefetches.add(this.nonPersistent);
     }
 
     public synchronized void start() throws Exception {
@@ -99,7 +98,7 @@
      */
     public synchronized void add(ConnectionContext context, Destination destination) throws
Exception {
         if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()))
{
-            TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId,
subscriberName, subscription);
+            TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination,
clientId, subscriberName);
             tsp.setMaxBatchSize(getMaxBatchSize());
             tsp.setSystemUsage(systemUsage);
             tsp.setEnableAudit(isEnableAudit());
@@ -290,6 +289,16 @@
         }
         if (nonPersistent != null) {
             nonPersistent.setEnableAudit(enableAudit);
+        }
+    }
+    
+    public void setUseCache(boolean useCache) {
+        super.setUseCache(useCache);
+        for (PendingMessageCursor cursor : storePrefetches) {
+            cursor.setUseCache(useCache);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.setUseCache(useCache);
         }
     }
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Tue Feb  5 04:14:49 2008
@@ -231,6 +231,16 @@
             nonPersistent.setEnableAudit(enableAudit);
         }
     }
+    
+    public void setUseCache(boolean useCache) {
+        super.setUseCache(useCache);
+        if (persistent != null) {
+            persistent.setUseCache(useCache);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.setUseCache(useCache);
+        }
+    }
 
 
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=618621&r1=618620&r2=618621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Tue Feb  5 04:14:49 2008
@@ -17,21 +17,15 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.Map;
 
-import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.usage.Usage;
-import org.apache.activemq.usage.UsageListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -41,27 +35,22 @@
  * 
  * @version $Revision$
  */
-class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener,
UsageListener {
-
+class TopicStorePrefetch extends AbstractStoreCursor {
     private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
     private TopicMessageStore store;
     private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message>
();
     private String clientId;
     private String subscriberName;
-    private Destination regionDestination;
-    private boolean batchResetNeeded = true;
-    private boolean storeHasMessages = false;
-    private boolean started;
-    private final Subscription subscription;
-   
+    private Subscription subscription;
+    
     /**
      * @param topic
      * @param clientId
      * @param subscriberName
      */
-    public TopicStorePrefetch(Topic topic, String clientId, String subscriberName, Subscription
subscription) {
-        this.regionDestination = topic;
-        this.subscription = subscription;
+    public TopicStorePrefetch(Subscription subscription,Topic topic, String clientId, String
subscriberName) {
+        super(topic);
+        this.subscription=subscription;
         this.store = (TopicMessageStore)topic.getMessageStore();
         this.clientId = clientId;
         this.subscriberName = subscriberName;
@@ -69,109 +58,6 @@
         this.maxAuditDepth=10000;
     }
 
-    public synchronized void start() throws Exception {
-        if (!started) {
-            started = true;
-            super.start();
-            this.storeHasMessages = getStoreSize() > 0;
-            getSystemUsage().getMemoryUsage().addUsageListener(this);
-        }
-    }
-
-    public synchronized void stop() throws Exception {
-        if (started) {
-            started = false;
-            getSystemUsage().getMemoryUsage().removeUsageListener(this);
-            super.stop();
-            store.resetBatching(clientId, subscriberName);
-            gc();
-        }
-    }
-
-    /**
-     * @return true if there are no pendingCount messages
-     */
-    public synchronized boolean isEmpty() {
-        safeFillBatch();
-        return batchList.isEmpty();
-    }
-
-    public synchronized int size() {
-        safeFillBatch();
-        return batchList.size();
-    }
-
-    public synchronized void addMessageLast(MessageReference node) throws Exception {
-        if (node != null) {
-        	storeHasMessages=true;
-            node.decrementReferenceCount();
-        }
-    }
-
-    public synchronized void addMessageFirst(MessageReference node) throws Exception {
-        if (node != null) {
-        	storeHasMessages=true;
-            node.decrementReferenceCount();
-            rollback(node.getMessageId());
-        }
-    }
-
-    public synchronized void remove() {
-    }
-
-    public synchronized void remove(MessageReference node) {
-    }
-
-    public synchronized void clear() {
-        gc();
-    }
-
-    public synchronized boolean hasNext() {
-        boolean result =  !isEmpty();
-        return result;
-    }
-
-    public synchronized MessageReference next() {
-        Message result = null;
-        safeFillBatch();
-        if (batchList.isEmpty()) {
-            return null;
-        } else {
-            Iterator i = batchList.entrySet().iterator();
-            result = (Message) ((Map.Entry)i.next()).getValue();
-            i.remove();
-            result.setRegionDestination(regionDestination);
-            result.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
-        }
-        return result;
-    }
-
-    public void reset() {
-    }
-
-    // MessageRecoveryListener implementation
-    public void finished() {
-    }
-
-    public synchronized boolean recoverMessage(Message message)
-            throws Exception {
-        MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
-        messageEvaluationContext.setMessageReference(message);
-        if (subscription.matches(message, messageEvaluationContext)) {
-            message.setRegionDestination(regionDestination);
-            if (!isDuplicate(message.getMessageId())) {
-                // only increment if count is zero (could have been cached)
-                if (message.getReferenceCount() == 0) {
-                    message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
-                    message.incrementReferenceCount();
-                   
-                }
-                batchList.put(message.getMessageId(), message);
-            }
-        }
-        return true;
-    }
-
     public boolean recoverMessageReference(MessageId messageReference) throws Exception {
         // shouldn't get called
         throw new RuntimeException("Not supported");
@@ -190,32 +76,18 @@
             }
         }
     }
-
-    // implementation
-    protected synchronized void safeFillBatch() {
-        try {
-            fillBatch();
-        } catch (Exception e) {
-            LOG.error("Failed to fill batch", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    protected synchronized void fillBatch() throws Exception {
-        if (batchResetNeeded) {
-            this.store.resetBatching(clientId, subscriberName);
-            this.batchResetNeeded = false;
-        }
-        while (this.batchList.isEmpty() && this.storeHasMessages) {
-            this.storeHasMessages = false;
-            this.store.recoverNextMessages(clientId, subscriberName,
-                    maxBatchSize, this);
-            if (!this.batchList.isEmpty()) {
-                this.storeHasMessages=true;
-            }
+    
+    public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception
{
+        MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
+        messageEvaluationContext.setMessageReference(message);
+        if (this.subscription.matches(message, messageEvaluationContext)) {
+            return super.recoverMessage(message, cached);
         }
+        return false;
+        
     }
 
+   
     protected synchronized int getStoreSize() {
         try {
             return store.getMessageCount(clientId, subscriberName);
@@ -225,24 +97,14 @@
         }
     }
 
-    public synchronized void gc() {
-        for (Message msg : batchList.values()) {
-            rollback(msg.getMessageId());
-            msg.decrementReferenceCount();
-        }
-        batchList.clear();
-        batchResetNeeded = true;
+            
+    protected void resetBatch() {
+        this.store.resetBatching(clientId, subscriberName);
     }
     
-    public void onUsageChanged(Usage usage, int oldPercentUsage,int newPercentUsage) {
-        if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
-        	storeHasMessages = true;
-            try {
-                fillBatch();
-            } catch (Exception e) {
-                LOG.error("Failed to fill batch ", e);
-            }
-        }
+    protected void doFillBatch() throws Exception {
+        this.store.recoverNextMessages(clientId, subscriberName,
+                maxBatchSize, this);
     }
 
     public String toString() {



Mime
View raw message