Return-Path: Delivered-To: apmail-activemq-users-archive@www.apache.org Received: (qmail 31023 invoked from network); 19 Apr 2010 08:57:05 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 19 Apr 2010 08:57:05 -0000 Received: (qmail 46977 invoked by uid 500); 19 Apr 2010 08:57:04 -0000 Delivered-To: apmail-activemq-users-archive@activemq.apache.org Received: (qmail 46863 invoked by uid 500); 19 Apr 2010 08:57:02 -0000 Mailing-List: contact users-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@activemq.apache.org Delivered-To: mailing list users@activemq.apache.org Received: (qmail 46855 invoked by uid 99); 19 Apr 2010 08:57:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Apr 2010 08:57:02 +0000 X-ASF-Spam-Status: No, hits=0.0 required=10.0 tests=FREEMAIL_FROM,RCVD_IN_DNSWL_NONE,SPF_PASS,T_TO_NO_BRKTS_FREEMAIL X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of jamiemccrindle@gmail.com designates 209.85.218.225 as permitted sender) Received: from [209.85.218.225] (HELO mail-bw0-f225.google.com) (209.85.218.225) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Apr 2010 08:56:54 +0000 Received: by bwz25 with SMTP id 25so4395789bwz.8 for ; Mon, 19 Apr 2010 01:56:34 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=domainkey-signature:mime-version:received:in-reply-to:references :date:received:message-id:subject:from:to:content-type :content-transfer-encoding; bh=lBxmdlA/iwD3IvL6ioG2l/5ktGpGv3LEdE0Es4OIGc4=; b=qBz2Rjtgb6AqnzxiVk+3gOSIfoupchxTsnz7dmpAQq/siPQEx8kKlWdHOVj59D5mXB q3HT+lX3bqlx49gb2haDMACQJP8FRa7jzeUE1ajUs2MmkmrF5XHbz2Y4Gwku9OJF2Qbc 7ZelFYB/j2Iuuwjd2lgiewiSJxYy9g48K+ur8= DomainKey-Signature: a=rsa-sha1; c=nofws; d=gmail.com; s=gamma; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type:content-transfer-encoding; b=S+sqdwcm6/2CYWvy02JZdgbG7jQUx4l+d5cJUkPMtqxfLZVIWbmQTn0IbvkzWPAj5s jQ6iicvhGfe1fp/hox/gmqAf0LErtahv/G76FeXO+ERxelGLDQ0KfXWSV7mLlbQjREM7 EvXw+KNvW0FvTL54df7MxgbyUMJtCi7nQNT6w= MIME-Version: 1.0 Received: by 10.239.177.80 with HTTP; Mon, 19 Apr 2010 01:56:33 -0700 (PDT) In-Reply-To: <28282467.post@talk.nabble.com> References: <28282467.post@talk.nabble.com> Date: Mon, 19 Apr 2010 09:56:33 +0100 Received: by 10.239.166.195 with SMTP id c3mr442799hbe.57.1271667393810; Mon, 19 Apr 2010 01:56:33 -0700 (PDT) Message-ID: Subject: Re: Network of Brokers From: Jamie McCrindle To: users@activemq.apache.org Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable X-Virus-Checked: Checked by ClamAV on apache.org Awesome. Thanks for looking into that... you're right. I changed the transacted to false in the original test and it worked... (I also spotted a bug in the original test in that it was creating both sessions from the same connection). In case anyone is interested, the updated test is attached... *sigh* it's pretty obvious in retrospect that a consumer isn't going to see a message where the transaction hasn't been committed... Thanks again Mike, cheers, j. ------- package org.example.activemq; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; public class NetworkTest extends TestCase { public void testNetworkOfBrokers() throws Exception { BrokerService brokerService1 =3D null; BrokerService brokerService2 =3D null; try { { brokerService1 =3D new BrokerService(); brokerService1.setBrokerName("one"); brokerService1.setUseJmx(false); brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter()); brokerService1.addConnector("tcp://0.0.0.0:61616"); NetworkConnector network1 =3D brokerService1.addNetworkConnector("static:(tcp://localhost:51515)"); // NetworkConnector network1 =3D brokerService1.addNetworkConnector("multicast://default"); network1.setName("network1"); network1.setDynamicOnly(true); network1.setNetworkTTL(3); network1.setPrefetchSize(1); brokerService1.start(); } { brokerService2 =3D new BrokerService(); brokerService2.setBrokerName("two"); brokerService2.setUseJmx(false); brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter()); brokerService2.addConnector("tcp://0.0.0.0:51515"); NetworkConnector network2 =3D brokerService2.addNetworkConnector("static:(tcp://localhost:61616)"); // NetworkConnector network2 =3D brokerService2.addNetworkConnector("multicast://default"); network2.setName("network2"); network2.setDynamicOnly(true); network2.setNetworkTTL(3); network2.setPrefetchSize(1); brokerService2.start(); } ActiveMQConnectionFactory connectionFactory1 =3D new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:= 51515)?randomize=3Dfalse"); ActiveMQConnectionFactory connectionFactory2 =3D new ActiveMQConnectionFactory("failover:(tcp://localhost:51515,tcp://localhost:= 61616)?randomize=3Dfalse"); Connection connection1 =3D connectionFactory1.createConnection(); connection1.start(); Connection connection2 =3D connectionFactory2.createConnection(); connection2.start(); try { Session session1 =3D connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session2 =3D connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer1 =3D session1.createConsumer(new ActiveMQQueue("testingqueue")); MessageProducer producer2 =3D session2.createProducer(new ActiveMQQueue("testingqueue")); TextMessage message2 =3D session2.createTextMessage(); message2.setText("Hello World!"); producer2.send(message2); Message message1 =3D consumer1.receive(1000); assertNotNull(message1); System.out.println(message1); QueueBrowser browser =3D session2.createBrowser(new ActiveMQQueue("testingqueue")); Enumeration enumeration =3D browser.getEnumeration(); assertFalse(enumeration.hasMoreElements()); } finally { connection1.stop(); connection2.stop(); } } finally { try { if(brokerService1 !=3D null) { brokerService1.stop(); }} catch(Throwable t) { t.printStackTrace(); } try { if(brokerService2 !=3D null) { brokerService2.stop(); }} catch(Throwable t) { t.printStackTrace(); } } } } On Sun, Apr 18, 2010 at 3:10 PM, patzerbud wrote: > > > > dkfn wrote: >> >> :) It's the mailing list software conspiring, I tell you... adding it >> directly into the mail instead: >> > > OK, my first reply runs fine (i.e. without error) but didn't actually wor= k. > I noodled around with it a little more and offer the following: > > package org.apache.activemq.example; > > import java.util.Enumeration; > > import javax.jms.Connection; > import javax.jms.Message; > import javax.jms.MessageConsumer; > import javax.jms.MessageProducer; > import javax.jms.QueueBrowser; > import javax.jms.Session; > import javax.jms.TextMessage; > > import junit.framework.TestCase; > > import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.activemq.broker.BrokerService; > import org.apache.activemq.command.ActiveMQQueue; > import org.apache.activemq.network.DiscoveryNetworkConnector; > import org.apache.activemq.network.NetworkConnector; > import org.apache.activemq.store.memory.MemoryPersistenceAdapter; > > > public class QueueTest extends TestCase { > > =A0 =A0 =A0 =A0private static final String TEST_QUEUE =3D "testQueue"; > =A0 =A0 =A0 =A0private static final String LOCAL_MQ1 =3D "tcp://localhost= :61616"; > =A0 =A0 =A0 =A0private static final String LOCAL_MQ2 =3D "tcp://localhost= :51515"; > > > =A0 =A0public void testNetworkOfBrokers() throws Exception { > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Broker broker1 =3D createBroker("one", 616= 16, 51515); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Broker broker2 =3D createBroker("two", 515= 15, 61616); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0pause(10, "sleeping to allow brokers to st= artup & connect to each > other..."); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0System.out.println("creating consumer"); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Consumer consumer =3D createConsumer(LOCAL= _MQ2); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0pause(5, "sleeping to allow consumer to st= artup & connect to MQ..."); > > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0System.out.println("producing messages"); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Connection connection =3D null; > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0ActiveMQConnectionFactory = connectionFactory =3D new > ActiveMQConnectionFactory(LOCAL_MQ1); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0connection =3D connectionF= actory.createConnection(); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Session session =3D connec= tion.createSession(false, > Session.AUTO_ACKNOWLEDGE); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0MessageProducer producer = =3D session.createProducer(new > ActiveMQQueue(TEST_QUEUE)); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0connection.start(); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0for (int i=3D0; i<10; i++)= { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0TextMessag= e message =3D session.createTextMessage(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0message.se= tText("Hello World!"); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0producer.s= end(message); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0pause(5, "sleeping to allo= w consumer to consume all messages..."); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0QueueBrowser browser =3D s= ession.createBrowser(new > ActiveMQQueue(TEST_QUEUE)); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Enumeration enumeration= =3D browser.getEnumeration(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0assertFalse(enumeration.ha= sMoreElements()); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0catch (Exception e) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0e.printStackTrace(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0finally { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if (connec= tion !=3D null) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0connection.stop(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} catch (Throwable t) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0//t.printS= tackTrace(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if (broker= 2 !=3D null) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0broker2.stop(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} catch (Throwable t) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0//t.printS= tackTrace(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if (broker= 1 !=3D null) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0broker1.stop(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} catch (Throwable t) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0//t.printS= tackTrace(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0pause(2); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0System.out.println("All done!"); > =A0 =A0} > > =A0 =A0 =A0 =A0private void pause(int seconds) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0pause(seconds, null); > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0private void pause(int seconds, String msg) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if (msg !=3D null) System.out.println(msg)= ; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Thread.currentThread().sle= ep(seconds * 1000); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} catch (InterruptedException e) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0; // ignore > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0private Broker createBroker(String name, int listenerPort,= int > networkConnectorPort) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0System.out.println("creating broker "+name= ); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Thread brokerThread =3D null; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Broker broker =3D new Brok= er(name, listenerPort, networkConnectorPort); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0brokerThread =3D new Threa= d(broker); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0brokerThread.start(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0return broker; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} catch (Exception ignoreMe) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0ignoreMe.printStackTrace()= ; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0return null; > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0private Consumer createConsumer(String url) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Thread thread =3D null; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Consumer consumer =3D new = Consumer(url); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0thread =3D new Thread(cons= umer); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0thread.start(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0return consumer; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} catch (Exception ignoreMe) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0ignoreMe.printStackTrace()= ; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0return null; > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0private class Consumer implements Runnable { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0private final String url; // "tcp://localh= ost:51515" > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Consumer(String url) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0this.url =3D url; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0public void run() { > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Connection connection1 =3D= null; > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0ActiveMQCo= nnectionFactory connectionFactory1 =3D new > ActiveMQConnectionFactory(url); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0connection= 1 =3D connectionFactory1.createConnection(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0connection= 1.start(); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Session se= ssion1 =3D connection1.createSession(true, > Session.AUTO_ACKNOWLEDGE); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0MessageCon= sumer consumer1 =3D session1.createConsumer(new > ActiveMQQueue(TEST_QUEUE)); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0//for (int= i=3D0; i<1; i++) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0for (;;) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0Message message1 =3D consumer1.receive(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0assertNotNull(message1); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0System.out.println(message1); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0catch (Exception e) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0finally { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0if (connection1 !=3D null) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0 =A0 =A0 =A0 =A0connection1.stop(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} catch (T= hrowable t) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0t.printStackTrace(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0private static class Broker implements Runnable { > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0private String name; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0private int listenPort; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0private int connectorPort; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0private BrokerService brokerService =3D nu= ll; > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Broker(String name, int listenerPort, int = networkPort) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0this.name =3D name; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0listenPort =3D listenerPor= t; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0connectorPort =3D networkP= ort; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0public void run() { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0brokerServ= ice =3D new BrokerService(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0brokerServ= ice.setBrokerName(name); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0brokerServ= ice.setUseJmx(false); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0brokerServ= ice.setPersistenceAdapter(new > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0MemoryPers= istenceAdapter()); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0NetworkCon= nector network2 =3D new DiscoveryNetworkConnector(new > java.net.URI("static:(tcp://localhost:" + connectorPort + ")")); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0network2.s= etName("network-" + name); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0network2.s= etDynamicOnly(false); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0network2.s= etNetworkTTL(2); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0network2.s= etPrefetchSize(1); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0brokerServ= ice.addNetworkConnector(network2); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0brokerServ= ice.addConnector("tcp://0.0.0.0:" + listenPort); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0brokerServ= ice.start(); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0catch (Exception e) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0e.printSta= ckTrace(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0public void stop() { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if (broker= Service !=3D null) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0brokerService.stop(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} catch (Throwable t) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0t.printSta= ckTrace(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0} > > } > > > I changed the order around a little bit for the producer. However, I thin= k > the main difference was this: > > Session session =3D connection.createSession(false, Session.AUTO_ACKNOWLE= DGE); > > This code will not work if you specify true for the first arg. I'm not su= re, > but I think it's because this example is using the in memory persistence > adapter... > > HTH, > > Mike L (aka patzerbud) > > -- > View this message in context: http://old.nabble.com/Network-of-Brokers-tp= 28269405p28282467.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > >