activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r478509 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ cursors/ policy/
Date Thu, 23 Nov 2006 08:19:01 GMT
Author: rajdavies
Date: Thu Nov 23 00:19:00 2006
New Revision: 478509

URL: http://svn.apache.org/viewvc?view=rev&rev=478509
Log:
support for different message storage cursor types for Queue destination policies

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
Thu Nov 23 00:19:00 2006
@@ -127,7 +127,7 @@
         if (broker.getDestinationPolicy() != null) {
             PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
             if (entry != null) {
-                entry.configure(queue);
+                entry.configure(queue,broker.getTempDataStore());
             }
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Nov 23 00:19:00 2006
@@ -366,6 +366,11 @@
         if(!skipGc&&garbageSize>garbageSizeBeforeCollection){
             gc();
         }
+        try{
+            taskRunner.wakeup();
+        }catch(InterruptedException e){
+            log.warn("Task Runner failed to wakeup ",e);
+        }
     }
 
     public void gc() {
@@ -379,11 +384,6 @@
                     continue;
                 }
             }
-        }
-        try{
-            taskRunner.wakeup();
-        }catch(InterruptedException e){
-            log.warn("Task Runner failed to wakeup ",e);
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Thu Nov 23 00:19:00 2006
@@ -129,10 +129,6 @@
     // implementation
     protected void fillBatch() throws Exception{
         store.recoverNextMessages(maxBatchSize,this);
-        // this will add more messages to the batch list
-        if(!batchList.isEmpty()){
-            Message message=(Message)batchList.getLast();
-        }
     }
     
     public String toString() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Thu Nov 23 00:19:00 2006
@@ -57,6 +57,7 @@
             nonPersistent.setMaxBatchSize(getMaxBatchSize());
         }
         nonPersistent.start();
+        persistent.start();
         pendingCount=persistent.size();
     }
 
@@ -65,6 +66,7 @@
         if(nonPersistent!=null){
             nonPersistent.stop();
         }
+        persistent.stop();
         pendingCount=0;
     }
 

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java?view=auto&rev=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
Thu Nov 23 00:19:00 2006
@@ -0,0 +1,43 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+ * Creates a FilePendingMessageCursor
+ *  *
+ * @org.apache.xbean.XBean element="fileCursor" description="Pending messages paged in from
file"
+ * 
+ * @version $Revision$
+ */
+public class FilePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy{
+
+    /**
+     * @param queue
+     * @param tmpStore
+     * @return the cursor
+     * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue,
org.apache.activemq.kaha.Store)
+     */
+    public PendingMessageCursor getQueuePendingMessageCursor(Queue queue,Store tmpStore){
+         return new FilePendingMessageCursor("PendingCursor:" + queue.getName(),tmpStore);
+    }
+
+    
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java
Thu Nov 23 00:19:00 2006
@@ -14,7 +14,10 @@
 
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
 
 /**
  * Abstraction to allow different policies for holding messages awaiting dispatch on a Queue
@@ -25,8 +28,10 @@
 
     /**
      * Retrieve the configured pending message storage cursor;
+     * @param queue 
+     * @param tmpStore
      * @return the cursor
      * 
      */
-    public PendingMessageCursor getQueuePendingMessageCursor();
+    public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore);
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Nov 23 00:19:00 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.filter.DestinationMapEntry;
+import org.apache.activemq.kaha.Store;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -49,7 +50,7 @@
     private MessageGroupMapFactory messageGroupMapFactory;
     private PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy;
     
-    public void configure(Queue queue) {
+    public void configure(Queue queue, Store tmpStore) {
         if (dispatchPolicy != null) {
             queue.setDispatchPolicy(dispatchPolicy);
         }
@@ -61,7 +62,7 @@
             queue.getUsageManager().setLimit(memoryLimit);
         }
         if (pendingQueueMessageStoragePolicy != null) {
-            PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor();
+            PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor(queue,tmpStore);
             queue.setMessages(messages);
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
Thu Nov 23 00:19:00 2006
@@ -20,7 +20,7 @@
 
 /**
  * Creates a PendingMessageCursor that access the persistent store to retrieve messages
- *  *
+ *  
  * @org.apache.xbean.XBean element="storeDurableSubscriberCursor" description="Pending messages
for a durable subscriber
  *                         are referenced from the Store"
  * 

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java?view=auto&rev=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
Thu Nov 23 00:19:00 2006
@@ -0,0 +1,43 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+ * Creates a StoreQueueCursor
+ *  *
+ * @org.apache.xbean.XBean element="storeCursor" description="Pending messages paged in from
the Store"
+ * 
+ * @version $Revision$
+ */
+public class StorePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy{
+
+    /**
+     * @param queue
+     * @param tmpStore
+     * @return the cursor
+     * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue,
org.apache.activemq.kaha.Store)
+     */
+    public PendingMessageCursor getQueuePendingMessageCursor(Queue queue,Store tmpStore){
+        return new StoreQueueCursor(queue,tmpStore);
+    }
+
+    
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
Thu Nov 23 00:19:00 2006
@@ -1,41 +1,39 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
+import org.apache.activemq.kaha.Store;
 
 /**
  * Creates a VMPendingMessageCursor
- * 
- ** @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the
JVM"
+ *  *
+ * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM"
  * 
  * @version $Revision$
  */
-public class VMPendingQueueMessageStoragePolicy implements  PendingQueueMessageStoragePolicy
{
-    
+public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy{
 
     /**
-     * @return the Pending Message cursor
-     * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getgetQueuePendingMessageCursor()
+     * @param queue
+     * @param tmpStore
+     * @return the cursor 
      */
-    public PendingMessageCursor getQueuePendingMessageCursor(){
-       return new VMPendingMessageCursor();
+    public PendingMessageCursor getQueuePendingMessageCursor(Queue queue,Store tmpStore){
+        return new VMPendingMessageCursor();
     }
-
 }



Mime
View raw message