Author: rajdavies
Date: Thu Nov 23 00:19:00 2006
New Revision: 478509
URL: http://svn.apache.org/viewvc?view=rev&rev=478509
Log:
support for different message storage cursor types for Queue destination policies
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
(with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
(with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
Thu Nov 23 00:19:00 2006
@@ -127,7 +127,7 @@
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
- entry.configure(queue);
+ entry.configure(queue,broker.getTempDataStore());
}
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Nov 23 00:19:00 2006
@@ -366,6 +366,11 @@
if(!skipGc&&garbageSize>garbageSizeBeforeCollection){
gc();
}
+ try{
+ taskRunner.wakeup();
+ }catch(InterruptedException e){
+ log.warn("Task Runner failed to wakeup ",e);
+ }
}
public void gc() {
@@ -379,11 +384,6 @@
continue;
}
}
- }
- try{
- taskRunner.wakeup();
- }catch(InterruptedException e){
- log.warn("Task Runner failed to wakeup ",e);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Thu Nov 23 00:19:00 2006
@@ -129,10 +129,6 @@
// implementation
protected void fillBatch() throws Exception{
store.recoverNextMessages(maxBatchSize,this);
- // this will add more messages to the batch list
- if(!batchList.isEmpty()){
- Message message=(Message)batchList.getLast();
- }
}
public String toString() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Thu Nov 23 00:19:00 2006
@@ -57,6 +57,7 @@
nonPersistent.setMaxBatchSize(getMaxBatchSize());
}
nonPersistent.start();
+ persistent.start();
pendingCount=persistent.size();
}
@@ -65,6 +66,7 @@
if(nonPersistent!=null){
nonPersistent.stop();
}
+ persistent.stop();
pendingCount=0;
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java?view=auto&rev=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
Thu Nov 23 00:19:00 2006
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+ * Creates a FilePendingMessageCursor
+ * *
+ * @org.apache.xbean.XBean element="fileCursor" description="Pending messages paged in from
file"
+ *
+ * @version $Revision$
+ */
+public class FilePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy{
+
+ /**
+ * @param queue
+ * @param tmpStore
+ * @return the cursor
+ * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue,
org.apache.activemq.kaha.Store)
+ */
+ public PendingMessageCursor getQueuePendingMessageCursor(Queue queue,Store tmpStore){
+ return new FilePendingMessageCursor("PendingCursor:" + queue.getName(),tmpStore);
+ }
+
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java
Thu Nov 23 00:19:00 2006
@@ -14,7 +14,10 @@
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
/**
* Abstraction to allow different policies for holding messages awaiting dispatch on a Queue
@@ -25,8 +28,10 @@
/**
* Retrieve the configured pending message storage cursor;
+ * @param queue
+ * @param tmpStore
* @return the cursor
*
*/
- public PendingMessageCursor getQueuePendingMessageCursor();
+ public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Nov 23 00:19:00 2006
@@ -24,6 +24,7 @@
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.filter.DestinationMapEntry;
+import org.apache.activemq.kaha.Store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,7 +50,7 @@
private MessageGroupMapFactory messageGroupMapFactory;
private PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy;
- public void configure(Queue queue) {
+ public void configure(Queue queue, Store tmpStore) {
if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy);
}
@@ -61,7 +62,7 @@
queue.getUsageManager().setLimit(memoryLimit);
}
if (pendingQueueMessageStoragePolicy != null) {
- PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor();
+ PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor(queue,tmpStore);
queue.setMessages(messages);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
Thu Nov 23 00:19:00 2006
@@ -20,7 +20,7 @@
/**
* Creates a PendingMessageCursor that access the persistent store to retrieve messages
- * *
+ *
* @org.apache.xbean.XBean element="storeDurableSubscriberCursor" description="Pending messages
for a durable subscriber
* are referenced from the Store"
*
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java?view=auto&rev=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
Thu Nov 23 00:19:00 2006
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+ * Creates a StoreQueueCursor
+ * *
+ * @org.apache.xbean.XBean element="storeCursor" description="Pending messages paged in from
the Store"
+ *
+ * @version $Revision$
+ */
+public class StorePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy{
+
+ /**
+ * @param queue
+ * @param tmpStore
+ * @return the cursor
+ * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue,
org.apache.activemq.kaha.Store)
+ */
+ public PendingMessageCursor getQueuePendingMessageCursor(Queue queue,Store tmpStore){
+ return new StoreQueueCursor(queue,tmpStore);
+ }
+
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java?view=diff&rev=478509&r1=478508&r2=478509
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
Thu Nov 23 00:19:00 2006
@@ -1,41 +1,39 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
+import org.apache.activemq.kaha.Store;
/**
* Creates a VMPendingMessageCursor
- *
- ** @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the
JVM"
+ * *
+ * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM"
*
* @version $Revision$
*/
-public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy
{
-
+public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy{
/**
- * @return the Pending Message cursor
- * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getgetQueuePendingMessageCursor()
+ * @param queue
+ * @param tmpStore
+ * @return the cursor
*/
- public PendingMessageCursor getQueuePendingMessageCursor(){
- return new VMPendingMessageCursor();
+ public PendingMessageCursor getQueuePendingMessageCursor(Queue queue,Store tmpStore){
+ return new VMPendingMessageCursor();
}
-
}
|