activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r912652 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/bugs/ test/java/org/apache/activemq/bugs/amq1974/
Date Mon, 22 Feb 2010 16:54:23 GMT
Author: rajdavies
Date: Mon Feb 22 16:54:23 2010
New Revision: 912652

URL: http://svn.apache.org/viewvc?rev=912652&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2616

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java 
 (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=912652&r1=912651&r2=912652&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
Mon Feb 22 16:54:23 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.IOException;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
@@ -23,6 +24,8 @@
 import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * The Queue is a List of MessageEntry objects that are dispatched to matching
@@ -31,6 +34,7 @@
  * @version $Revision: 1.28 $
  */
 public class TempQueue extends Queue{
+    private static final Log LOG = LogFactory.getLog(TempQueue.class);
     private final ActiveMQTempDestination tempDest;
    
     
@@ -50,6 +54,7 @@
         this.tempDest = (ActiveMQTempDestination) destination;
     }
     
+    @Override
     public void initialize() throws Exception {
         this.messages=new VMPendingMessageCursor();
         this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
@@ -58,6 +63,7 @@
         this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue:  " + destination.getPhysicalName());
     }
     
+    @Override
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
{
         // Only consumers on the same connection can consume from
         // the temporary destination
@@ -74,4 +80,14 @@
         }
         super.addSubscription(context, sub);
     }
+    
+    @Override
+    public void dispose(ConnectionContext context) throws IOException {
+        try {
+           purge();
+        } catch (Exception e) {
+          LOG.warn("Caught an exception purging Queue: " + destination);
+        }
+        super.dispose(context);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=912652&r1=912651&r2=912652&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Mon Feb 22 16:54:23 2010
@@ -86,7 +86,12 @@
             clearIterator(true);
             recovered = true;
         } else {
-            LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor
got duplicate: " + message);
+            /*
+             * we should expect to get these - as the message is recorded as it before it
goes into
+             * the cache. If subsequently, we pull out that message from the store (before
its deleted)
+             * it will be a duplicate - but should be ignored
+             */
+            //LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + "
cursor got duplicate: " + message);
             storeHasMessages = true;
         }
         return recovered;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=912652&r1=912651&r2=912652&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Mon Feb 22 16:54:23 2010
@@ -20,7 +20,6 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -33,38 +32,39 @@
  * @version $Revision$
  */
 public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
-    private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
+    private final LinkedList<MessageReference> list = new LinkedList<MessageReference>();
     private Iterator<MessageReference> iter;
-    public VMPendingMessageCursor(){
-        this.useCache=false;
+    public VMPendingMessageCursor() {
+        this.useCache = false;
     }
 
-    
     @Override
-    public synchronized List<MessageReference> remove(ConnectionContext context, Destination
destination) throws Exception {
-    	List<MessageReference> rc = new ArrayList<MessageReference>();
+    public synchronized List<MessageReference> remove(ConnectionContext context, Destination
destination)
+            throws Exception {
+        List<MessageReference> rc = new ArrayList<MessageReference>();
         for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();)
{
             MessageReference r = iterator.next();
-            if( r.getRegionDestination()==destination ) {
+            if (r.getRegionDestination() == destination) {
                 r.decrementReferenceCount();
                 rc.add(r);
                 iterator.remove();
             }
         }
-        return rc ;        
+        return rc;
     }
-    
+
     /**
      * @return true if there are no pending messages
      */
+    @Override
     public synchronized boolean isEmpty() {
         if (list.isEmpty()) {
             return true;
         } else {
             for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();)
{
                 MessageReference node = iterator.next();
-                if (node== QueueMessageReference.NULL_MESSAGE){
-                	continue;
+                if (node == QueueMessageReference.NULL_MESSAGE) {
+                    continue;
                 }
                 if (!node.isDropped()) {
                     return false;
@@ -79,6 +79,7 @@
     /**
      * reset the cursor
      */
+    @Override
     public synchronized void reset() {
         iter = list.listIterator();
         last = null;
@@ -89,6 +90,7 @@
      * 
      * @param node
      */
+    @Override
     public synchronized void addMessageLast(MessageReference node) {
         node.incrementReferenceCount();
         list.addLast(node);
@@ -100,6 +102,7 @@
      * @param position
      * @param node
      */
+    @Override
     public synchronized void addMessageFirst(MessageReference node) {
         node.incrementReferenceCount();
         list.addFirst(node);
@@ -108,6 +111,7 @@
     /**
      * @return true if there pending messages to dispatch
      */
+    @Override
     public synchronized boolean hasNext() {
         return iter.hasNext();
     }
@@ -115,8 +119,9 @@
     /**
      * @return the next pending message
      */
+    @Override
     public synchronized MessageReference next() {
-        last = (MessageReference)iter.next();
+        last = iter.next();
         if (last != null) {
             last.incrementReferenceCount();
         }
@@ -126,6 +131,7 @@
     /**
      * remove the message at the cursor position
      */
+    @Override
     public synchronized void remove() {
         if (last != null) {
             last.decrementReferenceCount();
@@ -136,6 +142,7 @@
     /**
      * @return the number of pending messages
      */
+    @Override
     public synchronized int size() {
         return list.size();
     }
@@ -143,10 +150,16 @@
     /**
      * clear all pending messages
      */
+    @Override
     public synchronized void clear() {
+        for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
+            MessageReference ref = i.next();
+            ref.decrementReferenceCount();
+        }
         list.clear();
     }
 
+    @Override
     public synchronized void remove(MessageReference node) {
         for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
             MessageReference ref = i.next();
@@ -164,11 +177,19 @@
      * @param maxItems
      * @return a list of paged in messages
      */
+    @Override
     public LinkedList<MessageReference> pageInList(int maxItems) {
         return list;
     }
-    
+
+    @Override
     public boolean isTransient() {
         return true;
     }
+
+    @Override
+    public void destroy() throws Exception {
+        super.destroy();
+        clear();
+    }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java?rev=912652&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java Mon
Feb 22 16:54:23 2010
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
+
+public class AMQ2616Test extends TestCase {
+    private static final int NUMBER = 2000;
+    private BrokerService brokerService;
+    private final ArrayList<Thread> threads = new ArrayList<Thread>();
+    String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:61616";
+    AtomicBoolean shutdown = new AtomicBoolean();
+    
+    public void testQueueResourcesReleased() throws Exception{
+        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND);
+        Connection tempConnection = fac.createConnection();
+        tempConnection.start();
+        Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue tempQueue = tempSession.createTemporaryQueue();
+        final MessageConsumer tempConsumer = tempSession.createConsumer(tempQueue);
+               
+        Connection testConnection = fac.createConnection();
+        long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
+        Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer testProducer = testSession.createProducer(tempQueue);
+        byte[] payload = new byte[1024*4];
+        for (int i = 0; i < NUMBER; i++ ) {
+            BytesMessage msg = testSession.createBytesMessage();
+            msg.writeBytes(payload);
+            testProducer.send(msg);
+        }
+        long endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
+        assertFalse(startUsage==endUsage);
+        tempConnection.close();
+        Thread.sleep(1000);
+        endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
+        assertEquals(startUsage,endUsage);
+    }
+    
+    public void testTopicResourcesReleased() throws Exception{
+        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND);
+        Connection tempConnection = fac.createConnection();
+        tempConnection.start();
+        Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic tempTopic = tempSession.createTemporaryTopic();
+        final MessageConsumer tempConsumer = tempSession.createConsumer(tempTopic);
+               
+        Connection testConnection = fac.createConnection();
+        long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
+        Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer testProducer = testSession.createProducer(tempTopic);
+        byte[] payload = new byte[1024*4];
+        for (int i = 0; i < NUMBER; i++ ) {
+            BytesMessage msg = testSession.createBytesMessage();
+            msg.writeBytes(payload);
+            testProducer.send(msg);
+        }
+        long endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
+        assertFalse(startUsage==endUsage);
+        tempConnection.close();
+        Thread.sleep(1000);
+        endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
+        assertEquals(startUsage,endUsage);
+    }
+    
+    
+    @Override
+    protected void setUp() throws Exception {
+        // Start an embedded broker up.
+        brokerService = new BrokerService();
+
+        KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
+        adaptor.setEnableJournalDiskSyncs(false);
+        File file = new File("target/AMQ2616Test");
+        IOHelper.mkdirs(file);
+        IOHelper.deleteChildren(file);
+        adaptor.setDirectory(file);
+        brokerService.setPersistenceAdapter(adaptor);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry pe = new PolicyEntry();
+        pe.setMemoryLimit(10 * 1024 * 1024);
+        pe.setOptimizedDispatch(true);
+        pe.setProducerFlowControl(false);
+        pe.setExpireMessagesPeriod(1000);
+        pe.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
+        policyMap.put(new ActiveMQQueue(">"), pe);
+        brokerService.setDestinationPolicy(policyMap);
+        brokerService.getSystemUsage().getMemoryUsage().setLimit(20 * 1024 * 1024);
+        brokerService.getSystemUsage().getTempUsage().setLimit(200 * 1024 * 1024);
+        brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
+        brokerService.start();
+        new ActiveMQQueue(getName());
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        // Stop any running threads.
+        shutdown.set(true);
+        for (Thread t : threads) {
+            t.interrupt();
+            t.join();
+        }
+        brokerService.stop();
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java?rev=912652&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
Mon Feb 22 16:54:23 2010
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs.amq1974;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
+
+public class TryJmsClient
+{
+    private final BrokerService broker = new BrokerService();
+
+    public static void main(String[] args) throws Exception {
+        new TryJmsClient().start();
+    }
+
+    private void start() throws Exception {
+
+        broker.setUseJmx(false);
+        broker.setPersistent(true);
+        broker.setBrokerName("TestBroker");
+        broker.getSystemUsage().setSendFailIfNoSpace(true);
+
+        broker.getSystemUsage().getMemoryUsage().setLimit(10 * 1024 * 1024);
+
+        KahaPersistenceAdapter persist = new KahaPersistenceAdapter();
+        persist.setDirectory(new File("/tmp/broker2"));
+        persist.setMaxDataFileLength(20 * 1024 * 1024);
+        broker.setPersistenceAdapter(persist);
+
+        String brokerUrl = "tcp://localhost:4501";
+        broker.addConnector(brokerUrl);
+
+        broker.start();
+
+        addNetworkBroker();
+
+        startUsageMonitor(broker);
+
+        startMessageSend();
+
+        synchronized(this) {
+            this.wait();
+        }
+    }
+
+    private void startUsageMonitor(final BrokerService brokerService) {
+        new Thread(new Runnable() {
+            public void run() {
+                while (true) {
+                    try {
+                        Thread.sleep(10000);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+
+                    System.out.println("ActiveMQ memeory " + brokerService.getSystemUsage().getMemoryUsage().getPercentUsage()
+                            + " " + brokerService.getSystemUsage().getMemoryUsage().getUsage());
+                    System.out.println("ActiveMQ message store " + brokerService.getSystemUsage().getStoreUsage().getPercentUsage());
+                    System.out.println("ActiveMQ temp space " + brokerService.getSystemUsage().getTempUsage().getPercentUsage());
+                }
+            }
+        }).start();
+    }
+
+    private void addNetworkBroker() throws Exception {
+
+        DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector();
+        dnc.setNetworkTTL(1);
+        dnc.setBrokerName("TestBroker");
+        dnc.setName("Broker1Connector");
+        dnc.setDynamicOnly(true);
+
+        SimpleDiscoveryAgent discoveryAgent = new SimpleDiscoveryAgent();
+        String remoteUrl = "tcp://localhost:4500";
+        discoveryAgent.setServices(remoteUrl);
+
+        dnc.setDiscoveryAgent(discoveryAgent);
+
+        broker.addNetworkConnector(dnc);
+        dnc.start();
+    }
+
+    private void startMessageSend() {
+        new Thread(new MessageSend()).start();
+    }
+
+    private class MessageSend implements Runnable {
+        public void run() {
+            try {
+                String url = "vm://TestBroker";
+                ActiveMQConnection connection = ActiveMQConnection.makeConnection(url);
+                connection.setDispatchAsync(true);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                Destination dest = session.createTopic("TestDestination");
+
+                MessageProducer producer = session.createProducer(dest);
+                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+                for(int i = 0; i < 99999999; i++)  {
+                    TextMessage message = session.createTextMessage("test" + i);
+
+                    /*
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    */
+
+                    try {
+                        producer.send(message);
+                    } catch (Exception e ) {
+                        e.printStackTrace();
+                        System.out.println("TOTAL number of messages sent " + i);
+                        break;
+                    }
+
+                    if (i % 1000 == 0) {
+                        System.out.println("sent message " + message.getJMSMessageID());
+                    }
+                }
+            } catch (JMSException e) {
+                e.printStackTrace();
+            } catch (URISyntaxException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java?rev=912652&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
Mon Feb 22 16:54:23 2010
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs.amq1974;
+import java.io.File;
+import java.net.URISyntaxException;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
+
+public class TryJmsManager {
+
+    private final BrokerService broker = new BrokerService();
+
+    public static void main(String[] args) throws Exception {
+        new TryJmsManager().start();
+    }
+
+    private void start() throws Exception {
+
+        broker.setUseJmx(false);
+        broker.setPersistent(true);
+        broker.setBrokerName("TestBroker");
+        broker.getSystemUsage().setSendFailIfNoSpace(true);
+
+        broker.getSystemUsage().getMemoryUsage().setLimit(10 * 1024 * 1024);
+
+        KahaPersistenceAdapter persist = new KahaPersistenceAdapter();
+        persist.setDirectory(new File("/tmp/broker1"));
+        persist.setMaxDataFileLength(20 * 1024 * 1024);
+        broker.setPersistenceAdapter(persist);
+
+        String brokerUrl = "tcp://localhost:4500";
+        broker.addConnector(brokerUrl);
+
+        broker.start();
+
+        addNetworkBroker();
+
+        startUsageMonitor(broker);
+
+        startMessageConsumer();
+
+        synchronized(this) {
+            this.wait();
+        }
+    }
+
+    private void startUsageMonitor(final BrokerService brokerService) {
+        new Thread(new Runnable() {
+            public void run() {
+                while (true) {
+                    try {
+                        Thread.sleep(10000);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+
+                    System.out.println("ActiveMQ memeory " + brokerService.getSystemUsage().getMemoryUsage().getPercentUsage()
+                            + " " + brokerService.getSystemUsage().getMemoryUsage().getUsage());
+                    System.out.println("ActiveMQ message store " + brokerService.getSystemUsage().getStoreUsage().getPercentUsage());
+                    System.out.println("ActiveMQ temp space " + brokerService.getSystemUsage().getTempUsage().getPercentUsage());
+                }
+            }
+        }).start();
+    }
+
+    private void addNetworkBroker() throws Exception {
+        DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector();
+        dnc.setNetworkTTL(1);
+        dnc.setBrokerName("TestBroker");
+        dnc.setName("Broker1Connector");
+        dnc.setDynamicOnly(true);
+
+        SimpleDiscoveryAgent discoveryAgent = new SimpleDiscoveryAgent();
+        String remoteUrl = "tcp://localhost:4501";
+        discoveryAgent.setServices(remoteUrl);
+
+        dnc.setDiscoveryAgent(discoveryAgent);
+
+        broker.addNetworkConnector(dnc);
+        dnc.start();
+    }
+
+    private void startMessageConsumer() throws JMSException, URISyntaxException {
+        String url = "vm://TestBroker";
+        ActiveMQConnection connection = ActiveMQConnection.makeConnection(url);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination dest = session.createTopic("TestDestination");
+
+        MessageConsumer consumer = session.createConsumer(dest);
+        consumer.setMessageListener(new MessageListener() {
+
+            public void onMessage(Message message) {
+                try {
+                    System.out.println("got message " + message.getJMSMessageID());
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        );
+
+        connection.start();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message