activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From djen...@apache.org
Subject svn commit: r732182 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Date Tue, 06 Jan 2009 23:48:45 GMT
Author: djencks
Date: Tue Jan  6 15:48:44 2009
New Revision: 732182

URL: http://svn.apache.org/viewvc?rev=732182&view=rev
Log:
Remove some dangerous method calls from the constructor (that did nothing), make some fields
final, remove duplicate start tracking, and finish generic conversion

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=732182&r1=732181&r2=732182&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Tue Jan  6 15:48:44 2009
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
+
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -44,22 +44,19 @@
 public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
 
     private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class);
-    private String clientId;
-    private String subscriberName;
-    private Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination,
TopicStorePrefetch>();
-    private List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
-    private boolean started;
-    private PendingMessageCursor nonPersistent;
+    private final String clientId;
+    private final String subscriberName;
+    private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination,
TopicStorePrefetch>();
+    private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
+    private final PendingMessageCursor nonPersistent;
     private PendingMessageCursor currentCursor;
-    private Subscription subscription;
+    private final Subscription subscription;
     /**
-     * @param broker 
-     * @param topic
-     * @param clientId
-     * @param subscriberName
-     * @param maxBatchSize 
-     * @param subscription 
-     * @throws IOException
+     * @param broker Broker for this cursor
+     * @param clientId clientId for this cursor
+     * @param subscriberName subscriber name for this cursor
+     * @param maxBatchSize currently ignored
+     * @param subscription  subscription for this cursor
      */
     public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int
maxBatchSize, Subscription subscription) {
         this.subscription=subscription;
@@ -70,17 +67,14 @@
         }else {
             this.nonPersistent = new VMPendingMessageCursor();
         }
-        this.nonPersistent.setMaxBatchSize(getMaxBatchSize());
+        //TODO is this correct? we are ignoring the constructor parameter matchBatchSize
+//        this.nonPersistent.setMaxBatchSize(getMaxBatchSize());
         this.nonPersistent.setSystemUsage(systemUsage);
-        this.nonPersistent.setEnableAudit(isEnableAudit());
-        this.nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
-        this.nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
         this.storePrefetches.add(this.nonPersistent);
     }
 
     public synchronized void start() throws Exception {
-        if (!started) {
-            started = true;
+        if (!isStarted()) {
             super.start();
             for (PendingMessageCursor tsp : storePrefetches) {
             	tsp.setMessageAudit(getMessageAudit());
@@ -90,8 +84,7 @@
     }
 
     public synchronized void stop() throws Exception {
-        if (started) {
-            started = false;
+        if (isStarted()) {
             super.stop();
             for (PendingMessageCursor tsp : storePrefetches) {
                 tsp.stop();
@@ -116,7 +109,7 @@
             tsp.setMaxProducersToAudit(getMaxProducersToAudit());
             topics.put(destination, tsp);
             storePrefetches.add(tsp);
-            if (started) {
+            if (isStarted()) {
                 tsp.start();
             }
         }
@@ -130,7 +123,7 @@
      * @throws Exception
      */
     public synchronized List<MessageReference> remove(ConnectionContext context, Destination
destination) throws Exception {
-        Object tsp = topics.remove(destination);
+        PendingMessageCursor tsp = topics.remove(destination);
         if (tsp != null) {
             storePrefetches.remove(tsp);
         }
@@ -161,7 +154,7 @@
      * Informs the Broker if the subscription needs to intervention to recover
      * it's state e.g. DurableTopicSubscriber may do
      * 
-     * @see org.apache.activemq.region.cursors.PendingMessageCursor
+     * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
      * @return true if recovery required
      */
     public boolean isRecoveryRequired() {
@@ -171,7 +164,7 @@
     public synchronized void addMessageLast(MessageReference node) throws Exception {
         if (node != null) {
             Message msg = node.getMessage();
-            if (started) {
+            if (isStarted()) {
                 if (!msg.isPersistent()) {
                     nonPersistent.addMessageLast(node);
                 }
@@ -228,16 +221,14 @@
     }
 
     public synchronized void reset() {
-        for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();)
{
-            AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next();
-            tsp.reset();
+        for (PendingMessageCursor storePrefetch : storePrefetches) {
+            storePrefetch.reset();
         }
     }
 
     public synchronized void release() {
-        for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();)
{
-            AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next();
-            tsp.release();
+        for (PendingMessageCursor storePrefetch : storePrefetches) {
+            storePrefetch.release();
         }
     }
 
@@ -250,24 +241,21 @@
     }
 
     public void setMaxBatchSize(int maxBatchSize) {
-        for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();)
{
-            AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next();
-            tsp.setMaxBatchSize(maxBatchSize);
+        for (PendingMessageCursor storePrefetch : storePrefetches) {
+            storePrefetch.setMaxBatchSize(maxBatchSize);
         }
         super.setMaxBatchSize(maxBatchSize);
     }
 
     public synchronized void gc() {
-        for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();)
{
-            PendingMessageCursor tsp = i.next();
+        for (PendingMessageCursor tsp : storePrefetches) {
             tsp.gc();
         }
     }
 
     public void setSystemUsage(SystemUsage usageManager) {
         super.setSystemUsage(usageManager);
-        for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();)
{
-            PendingMessageCursor tsp = i.next();
+        for (PendingMessageCursor tsp : storePrefetches) {
             tsp.setSystemUsage(usageManager);
         }
     }
@@ -303,8 +291,7 @@
     protected synchronized PendingMessageCursor getNextCursor() throws Exception {
         if (currentCursor == null || currentCursor.isEmpty()) {
             currentCursor = null;
-            for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();)
{
-                AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next();
+            for (PendingMessageCursor tsp : storePrefetches) {
                 if (tsp.hasNext()) {
                     currentCursor = tsp;
                     break;



Mime
View raw message