activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r504501 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activemq/store/rapid/ test/java/or...
Date Wed, 07 Feb 2007 11:12:54 GMT
Author: rajdavies
Date: Wed Feb  7 03:12:53 2007
New Revision: 504501

URL: http://svn.apache.org/viewvc?view=rev&rev=504501
Log:
fix some problems in Quick Journal - now message containers are using Kaha maps instead of
lists

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/CommandMarshaller.java
  (with props)
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MessageIdMarshaller.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=504501&r1=504500&r2=504501
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Wed Feb  7 03:12:53 2007
@@ -22,12 +22,12 @@
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.kaha.CommandMarshaller;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.memory.UsageListener;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.store.kahadaptor.CommandMarshaller;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/CommandMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/CommandMarshaller.java?view=auto&rev=504501
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/CommandMarshaller.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/CommandMarshaller.java
Wed Feb  7 03:12:53 2007
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * Marshall a Message or a MessageReference
+ * @version $Revision: 1.10 $
+ */
+public class CommandMarshaller implements Marshaller<Object> {
+    
+    private WireFormat wireFormat;
+    public CommandMarshaller(WireFormat wireFormat){
+        this.wireFormat = wireFormat;
+      
+    }
+    
+    public void writePayload(Object object,DataOutput dataOut) throws IOException{
+        ByteSequence packet = wireFormat.marshal(object);
+        dataOut.writeInt(packet.length);
+        dataOut.write(packet.data, packet.offset, packet.length);
+    }
+
+   
+    public Object readPayload(DataInput dataIn) throws IOException{
+        int size=dataIn.readInt();
+        byte[] data=new byte[size];
+        dataIn.readFully(data);
+        return wireFormat.unmarshal(new ByteSequence(data));
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/CommandMarshaller.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=504501&r1=504500&r2=504501
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
Wed Feb  7 03:12:53 2007
@@ -20,21 +20,20 @@
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.CommandMarshaller;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.MessageIdMarshaller;
 import org.apache.activemq.kaha.MessageMarshaller;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreFactory;
-import org.apache.activemq.kaha.StringMarshaller;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
@@ -78,7 +77,7 @@
         Set<ActiveMQDestination> rc=new HashSet<ActiveMQDestination>();
         try{
             Store store=getStore();
-            for(Iterator i=store.getListContainerIds().iterator();i.hasNext();){
+            for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
                 Object obj=i.next();
                 if(obj instanceof ActiveMQDestination){
                     rc.add((ActiveMQDestination) obj);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=504501&r1=504500&r2=504501
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Wed Feb  7 03:12:53 2007
@@ -110,7 +110,7 @@
 
     public void addReferenceFileIdsInUse(Set<Integer> rc){
         for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
-            ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
+            ReferenceRecord msg=(ReferenceRecord)messageContainer.getValue(entry);
             rc.add(msg.data.getFileId());
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=504501&r1=504500&r2=504501
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Wed Feb  7 03:12:53 2007
@@ -32,6 +32,7 @@
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.MessageIdMarshaller;
 import org.apache.activemq.kaha.MessageMarshaller;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.store.MessageStore;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=504501&r1=504500&r2=504501
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Wed Feb  7 03:12:53 2007
@@ -100,7 +100,7 @@
         for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
             TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
             if(subAck.getCount()>0){
-                ReferenceRecord rr=(ReferenceRecord)messageContainer.get(subAck.getMessageEntry());
+                ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
                 rc.add(rr.data.getFileId());
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java?view=diff&rev=504501&r1=504500&r2=504501
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
Wed Feb  7 03:12:53 2007
@@ -48,6 +48,7 @@
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.CommandMarshaller;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.Store;
@@ -60,9 +61,6 @@
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.kahadaptor.AtomicIntegerMarshaller;
-import org.apache.activemq.store.kahadaptor.CommandMarshaller;
-import org.apache.activemq.store.kahadaptor.KahaTopicMessageStore;
 import org.apache.activemq.store.kahadaptor.TopicSubAckMarshaller;
 import org.apache.activemq.store.rapid.RapidTransactionStore.Tx;
 import org.apache.activemq.store.rapid.RapidTransactionStore.TxOperation;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?view=diff&rev=504501&r1=504500&r2=504501
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
Wed Feb  7 03:12:53 2007
@@ -1,20 +1,17 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.perf;
 
 import javax.jms.Connection;
@@ -22,37 +19,38 @@
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Session;
-
 import junit.framework.TestCase;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 /**
  * @version $Revision: 1.3 $
  */
 public class SimpleTopicTest extends TestCase{
+
     private final Log log=LogFactory.getLog(getClass());
-    
     protected BrokerService broker;
-//    protected String bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false";
+    // protected String
+    // bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false";
     protected String bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=true";
-    //protected String bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=false";
-    //protected String bindAddress="vm://localhost?marshal=true";
-    //protected String bindAddress="vm://localhost";
+    // protected String
+    // bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=false";
+    // protected String bindAddress="vm://localhost?marshal=true";
+    // protected String bindAddress="vm://localhost";
     protected PerfProducer[] producers;
     protected PerfConsumer[] consumers;
     protected String DESTINATION_NAME=getClass().getName();
-    protected int SAMPLE_COUNT = 30;
-    protected long SAMPLE_INTERVAL = 2000;
-    protected int NUMBER_OF_CONSUMERS=10;
+    protected int SAMPLE_COUNT=30;
+    protected long SAMPLE_INTERVAL=2000;
+    protected int NUMBER_OF_CONSUMERS=1;
     protected int NUMBER_OF_PRODUCERS=1;
     protected int PAYLOAD_SIZE=1024;
     protected byte[] array=null;
     protected ConnectionFactory factory;
     protected Destination destination;
-    protected long CONSUMER_SLEEP_DURATION = 0;
+    protected long CONSUMER_SLEEP_DURATION=0;
 
     /**
      * Sets up a test where the producer and consumer have their own connection.
@@ -66,11 +64,9 @@
         factory=createConnectionFactory();
         Connection con=factory.createConnection();
         Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
         destination=createDestination(session,DESTINATION_NAME);
         log.info("Testing against destination: "+destination);
         log.info("Running "+NUMBER_OF_PRODUCERS+" producer(s) and "+NUMBER_OF_CONSUMERS+"
consumer(s)");
-        
         con.close();
         producers=new PerfProducer[NUMBER_OF_PRODUCERS];
         consumers=new PerfConsumer[NUMBER_OF_CONSUMERS];
@@ -81,7 +77,7 @@
         for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
             array=new byte[PAYLOAD_SIZE];
             for(int j=i;j<array.length;j++){
-                array[j]=(byte) j;
+                array[j]=(byte)j;
             }
             producers[i]=createProducer(factory,destination,i,array);
         }
@@ -118,7 +114,8 @@
         return answer;
     }
 
-    protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number,
byte[] payload) throws JMSException{
+    protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number,byte[]
payload)
+            throws JMSException{
         return new PerfProducer(fac,dest,payload);
     }
 
@@ -129,29 +126,25 @@
     protected void configureBroker(BrokerService answer) throws Exception{
         answer.addConnector(bindAddress);
         answer.setDeleteAllMessagesOnStartup(true);
-        
     }
 
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
         return new ActiveMQConnectionFactory(bindAddress);
     }
 
-    public void testPerformance() throws JMSException, InterruptedException{
-    	
+    public void testPerformance() throws JMSException,InterruptedException{
         for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
             consumers[i].start();
         }
         for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
             producers[i].start();
         }
-        
-    	log.info("Sampling performance "+SAMPLE_COUNT+" times at a "+SAMPLE_INTERVAL+" ms interval.");
-        for(int i=0; i < SAMPLE_COUNT; i++){
-        	Thread.sleep(SAMPLE_INTERVAL);
+        log.info("Sampling performance "+SAMPLE_COUNT+" times at a "+SAMPLE_INTERVAL+" ms
interval.");
+        for(int i=0;i<SAMPLE_COUNT;i++){
+            Thread.sleep(SAMPLE_INTERVAL);
             dumpProducerRate();
             dumpConsumerRate();
         }
-        
         for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
             producers[i].stop();
         }
@@ -164,11 +157,11 @@
         int totalRate=0;
         int totalCount=0;
         for(int i=0;i<producers.length;i++){
-        	PerfRate rate = producers[i].getRate().cloneAndReset();
+            PerfRate rate=producers[i].getRate().cloneAndReset();
             totalRate+=rate.getRate();
             totalCount+=rate.getTotalCount();
         }
-        int avgRate = totalRate/producers.length;
+        int avgRate=totalRate/producers.length;
         log.info("Avg producer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", sent
= "+totalCount);
     }
 
@@ -176,11 +169,13 @@
         int totalRate=0;
         int totalCount=0;
         for(int i=0;i<consumers.length;i++){
-        	PerfRate rate = consumers[i].getRate().cloneAndReset();
+            PerfRate rate=consumers[i].getRate().cloneAndReset();
             totalRate+=rate.getRate();
             totalCount+=rate.getTotalCount();
         }
-        int avgRate = totalRate/consumers.length;
-        log.info("Avg consumer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", received
= "+totalCount);
+        if(consumers!=null&&consumers.length>0){
+            int avgRate=totalRate/consumers.length;
+            log.info("Avg consumer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+",
received = "+totalCount);
+        }
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumer.java?view=diff&rev=504501&r1=504500&r2=504501
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumer.java
Wed Feb  7 03:12:53 2007
@@ -25,7 +25,7 @@
  * @version $Revision: 1.3 $
  */
 public class SlowConsumer extends PerfConsumer{
-    public SlowConsumer(ConnectionFactory fac,Destination dest,String consumerName,boolean
slowConsumer)
+    public SlowConsumer(ConnectionFactory fac,Destination dest,String consumerName)
                     throws JMSException{
         super(fac,dest,consumerName);
     }
@@ -36,6 +36,7 @@
 
     public void onMessage(Message msg){
         super.onMessage(msg);
+        System.err.println("GOT A MSG " + msg);
         try{
             Thread.sleep(10000);
         }catch(InterruptedException e){

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java?view=diff&rev=504501&r1=504500&r2=504501
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
Wed Feb  7 03:12:53 2007
@@ -1,65 +1,74 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.perf;
 
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.Session;
-import junit.framework.TestCase;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.xbean.BrokerFactoryBean;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.springframework.core.io.ClassPathResource;
 import org.springframework.core.io.Resource;
+
 /**
  * @version $Revision: 1.3 $
  */
 public class SlowConsumerTopicTest extends SimpleTopicTest{
+
     protected PerfConsumer[] slowConsumers;
     protected int NUMBER_OF_SLOW_CONSUMERS=1;
-    
-    
+
     protected void setUp() throws Exception{
+        NUMBER_OF_CONSUMERS=0;
+        PAYLOAD_SIZE=10 * 1024;
         super.setUp();
-        
         slowConsumers=new SlowConsumer[NUMBER_OF_SLOW_CONSUMERS];
         for(int i=0;i<NUMBER_OF_SLOW_CONSUMERS;i++){
-            consumers[i]=createSlowConsumer(factory,destination,i);
-            consumers[i].start();
+            slowConsumers[i]=createSlowConsumer(factory,destination,i);
+            slowConsumers[i].start();
         }
     }
-    
+
     protected PerfConsumer createSlowConsumer(ConnectionFactory fac,Destination dest,int
number) throws JMSException{
         return new SlowConsumer(fac,dest);
     }
-    
+
+    protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number,byte[]
payload)
+            throws JMSException{
+        PerfProducer result=super.createProducer(fac,dest,number,payload);
+        result.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        return result;
+    }
+
     protected BrokerService createBroker() throws Exception{
         Resource resource=new ClassPathResource("org/apache/activemq/perf/slowConsumerBroker.xml");
         BrokerFactoryBean factory=new BrokerFactoryBean(resource);
         factory.afterPropertiesSet();
-        BrokerService broker =factory.getBroker();
+        BrokerService broker=factory.getBroker();
         broker.start();
         return broker;
+    }
+    
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
+        ActiveMQConnectionFactory result = super.createConnectionFactory();
+        ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
+        policy.setTopicPrefetch(1000);
+        result.setPrefetchPolicy(policy);
+        return result;
     }
 }



Mime
View raw message