activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [03/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:34 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/activemq.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/activemq.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/activemq.xml
new file mode 100644
index 0000000..511d6d1
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/activemq.xml
@@ -0,0 +1,51 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+  <!-- Allows us to use system properties as variables in this configuration file -->
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+	<property name="location" value="org/apache/activemq/memory/usage.properties"/> 
+  </bean>  
+
+  <broker xmlns="http://activemq.apache.org/schema/core" brokerName="${name}" persistent="false">
+   
+    <!-- Use the following to set the broker memory limit -->
+	<systemUsage>
+	    <systemUsage>
+			<memoryUsage>
+		    	<memoryUsage limit="${limit}" percentUsageMinDelta="${delta}"/>
+			</memoryUsage>
+			<storeUsage>
+		    	<storeUsage limit="1 gb" name="foo"/>
+			</storeUsage>
+			<tempUsage>	
+		    	<tempUsage limit="100 mb"/>
+			</tempUsage>
+	    </systemUsage>
+	</systemUsage>
+
+  </broker>
+
+</beans>
+<!-- END SNIPPET: example -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/DummyMessage.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/DummyMessage.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/DummyMessage.java
new file mode 100644
index 0000000..e5823d8
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/DummyMessage.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory.buffer;
+
+import org.apache.activemq.command.ActiveMQMessage;
+
+/**
+ * A message implementation which is useful for testing as we can spoof its size
+ *  
+ * 
+ */
+public class DummyMessage extends ActiveMQMessage {
+
+    private int size;
+
+    public DummyMessage(int size) {
+        this.size = size;
+    }
+
+    public int getSize() {
+        return size;
+    }
+
+    public String toString() {
+        return "DummyMessage[id=" + getMessageId() + " size=" + size + "]"; 
+    }
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/MemoryBufferTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/MemoryBufferTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/MemoryBufferTestSupport.java
new file mode 100644
index 0000000..ea8f0a6
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/MemoryBufferTestSupport.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory.buffer;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.memory.buffer.MessageBuffer;
+import org.apache.activemq.memory.buffer.MessageQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * 
+ */
+public abstract class MemoryBufferTestSupport extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(MemoryBufferTestSupport.class);
+
+    protected MessageBuffer buffer = createMessageBuffer();
+    protected MessageQueue qA = buffer.createMessageQueue();
+    protected MessageQueue qB = buffer.createMessageQueue();
+    protected MessageQueue qC = buffer.createMessageQueue();
+    protected int messageCount;
+
+    protected abstract MessageBuffer createMessageBuffer();
+
+    protected void setUp() throws Exception {
+        buffer = createMessageBuffer();
+        qA = buffer.createMessageQueue();
+        qB = buffer.createMessageQueue();
+        qC = buffer.createMessageQueue();
+    }
+
+    protected void dump() {
+        LOG.info("Dumping current state");
+        dumpQueue(qA, "A");
+        dumpQueue(qB, "B");
+        dumpQueue(qC, "C");
+    }
+
+    protected void dumpQueue(MessageQueue queue, String name) {
+        LOG.info("  " + name + " = " + queue.getList());
+    }
+
+    protected ActiveMQMessage createMessage(int size) throws Exception {
+        DummyMessage answer = new DummyMessage(size);
+        answer.setIntProperty("counter", ++messageCount);
+        answer.setJMSMessageID("" + messageCount);
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/OrderBasedMemoryBufferTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/OrderBasedMemoryBufferTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/OrderBasedMemoryBufferTest.java
new file mode 100644
index 0000000..2e771f2
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/OrderBasedMemoryBufferTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory.buffer;
+
+import org.apache.activemq.memory.buffer.MessageBuffer;
+import org.apache.activemq.memory.buffer.OrderBasedMessageBuffer;
+
+
+/**
+ *
+ * 
+ */
+public class OrderBasedMemoryBufferTest extends MemoryBufferTestSupport {
+
+    public void testSizeWorks() throws Exception {
+        qA.add(createMessage(10));
+        qB.add(createMessage(10));
+        qB.add(createMessage(10));
+        qC.add(createMessage(10));
+        
+        dump();
+        
+        assertEquals("buffer size", 40, buffer.getSize());
+        assertEquals("qA", 10, qA.getSize());
+        assertEquals("qB", 20, qB.getSize());
+        assertEquals("qC", 10, qC.getSize());
+        
+        qC.add(createMessage(10));
+        
+        dump();
+        
+        assertEquals("buffer size", 40, buffer.getSize());
+        assertEquals("qA", 0, qA.getSize());
+        assertEquals("qB", 20, qB.getSize());
+        assertEquals("qC", 20, qC.getSize());
+
+        qB.add(createMessage(10));
+        
+        dump();
+        
+        assertEquals("buffer size", 40, buffer.getSize());
+        assertEquals("qA", 0, qA.getSize());
+        assertEquals("qB", 20, qB.getSize());
+        assertEquals("qC", 20, qC.getSize());
+
+        qA.add(createMessage(10));
+
+        dump();
+        
+        assertEquals("buffer size", 40, buffer.getSize());
+        assertEquals("qA", 10, qA.getSize());
+        assertEquals("qB", 10, qB.getSize());
+        assertEquals("qC", 20, qC.getSize());
+    }
+
+    
+    protected MessageBuffer createMessageBuffer() {
+        return new OrderBasedMessageBuffer(40);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/SizeBasedMessageBufferTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/SizeBasedMessageBufferTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/SizeBasedMessageBufferTest.java
new file mode 100644
index 0000000..ad02821
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/SizeBasedMessageBufferTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory.buffer;
+
+import org.apache.activemq.memory.buffer.MessageBuffer;
+import org.apache.activemq.memory.buffer.SizeBasedMessageBuffer;
+
+
+/**
+ *
+ * 
+ */
+public class SizeBasedMessageBufferTest extends MemoryBufferTestSupport {
+
+    public void testSizeWorks() throws Exception {
+        qA.add(createMessage(10));
+        qB.add(createMessage(10));
+        qB.add(createMessage(10));
+        qC.add(createMessage(10));
+        
+        dump();
+        
+        assertEquals("buffer size", 40, buffer.getSize());
+        assertEquals("qA", 10, qA.getSize());
+        assertEquals("qB", 20, qB.getSize());
+        assertEquals("qC", 10, qC.getSize());
+        
+        // now lets force an eviction
+        qC.add(createMessage(10));
+
+        dump();
+        
+        assertEquals("buffer size", 40, buffer.getSize());
+        assertEquals("qA", 10, qA.getSize());
+        assertEquals("qB", 10, qB.getSize());
+        assertEquals("qC", 20, qC.getSize());
+    }
+
+    
+    protected MessageBuffer createMessageBuffer() {
+        return new SizeBasedMessageBuffer(40);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/usage.properties
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/usage.properties b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/usage.properties
new file mode 100644
index 0000000..b5d33d1
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/usage.properties
@@ -0,0 +1,19 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+limit=1k
+name=test-broker
+delta=34
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
new file mode 100644
index 0000000..9f085b4
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
@@ -0,0 +1,629 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
+import javax.management.openmbean.CompositeData;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTestSupport;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class duplicates most of the functionality in {@link NetworkTestSupport}
+ * and {@link BrokerTestSupport} because more control was needed over how brokers
+ * and connectors are created. Also, this test asserts message counts via JMX on
+ * each broker.
+ */
+public class BrokerNetworkWithStuckMessagesTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BrokerNetworkWithStuckMessagesTest.class);
+
+    private BrokerService localBroker;
+    private BrokerService remoteBroker;
+    private BrokerService secondRemoteBroker;
+    private DemandForwardingBridge bridge;
+
+    protected Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
+    protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
+
+    protected TransportConnector connector;
+    protected TransportConnector remoteConnector;
+    protected TransportConnector secondRemoteConnector;
+
+    protected long idGenerator;
+    protected int msgIdGenerator;
+    protected int tempDestGenerator;
+    protected int maxWait = 4000;
+    protected String queueName = "TEST";
+
+    protected String amqDomain = "org.apache.activemq";
+
+    @Before
+    public void setUp() throws Exception {
+
+        // For those who want visual confirmation:
+        //   Uncomment the following to enable JMX support on a port number to use
+        //   Jconsole to view each broker. You will need to add some calls to
+        //   Thread.sleep() to be able to actually slow things down so that you
+        //   can manually see JMX attrs.
+//        System.setProperty("com.sun.management.jmxremote", "");
+//        System.setProperty("com.sun.management.jmxremote.port", "1099");
+//        System.setProperty("com.sun.management.jmxremote.authenticate", "false");
+//        System.setProperty("com.sun.management.jmxremote.ssl", "false");
+
+        // Create the local broker
+        createBroker();
+        // Create the remote broker
+        createRemoteBroker();
+
+        // Remove the activemq-data directory from the creation of the remote broker
+        FileUtils.deleteDirectory(new File("activemq-data"));
+
+        // Create a network bridge between the local and remote brokers so that
+        // demand-based forwarding can take place
+        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+        config.setBrokerName("local");
+        config.setDispatchAsync(false);
+        config.setDuplex(true);
+
+        Transport localTransport = createTransport();
+        Transport remoteTransport = createRemoteTransport();
+
+        // Create a network bridge between the two brokers
+        bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
+        bridge.setBrokerService(localBroker);
+        bridge.start();
+
+
+        // introduce a second broker/bridge on remote that should not get any messages because of networkTtl=1
+        // local <-> remote <-> secondRemote
+        createSecondRemoteBroker();
+        config = new NetworkBridgeConfiguration();
+        config.setBrokerName("remote");
+        config.setDuplex(true);
+
+        localTransport = createRemoteTransport();
+        remoteTransport = createSecondRemoteTransport();
+
+        // Create a network bridge between the two brokers
+        bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
+        bridge.setBrokerService(remoteBroker);
+        bridge.start();
+
+        waitForBridgeFormation();
+    }
+
+    protected void waitForBridgeFormation() throws Exception {
+        for (final BrokerService broker : brokers.values()) {
+            if (!broker.getNetworkConnectors().isEmpty()) {
+                // Max wait here is 30 secs
+                Wait.waitFor(new Wait.Condition() {
+                    @Override
+                    public boolean isSatisified() throws Exception {
+                        return !broker.getNetworkConnectors().get(0).activeBridges().isEmpty();
+                    }});
+            }
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        bridge.stop();
+        localBroker.stop();
+        remoteBroker.stop();
+        secondRemoteBroker.stop();
+    }
+
+    @Test(timeout=120000)
+    public void testBrokerNetworkWithStuckMessages() throws Exception {
+
+        int sendNumMessages = 10;
+        int receiveNumMessages = 5;
+
+        // Create a producer
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo);
+
+        // Create a destination on the local broker
+        ActiveMQDestination destinationInfo1 = null;
+
+        // Send a 10 messages to the local broker
+        for (int i = 0; i < sendNumMessages; ++i) {
+            destinationInfo1 = createDestinationInfo(connection1, connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
+            connection1.request(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
+        }
+
+        // Ensure that there are 10 messages on the local broker
+        Object[] messages = browseQueueWithJmx(localBroker);
+        assertEquals(sendNumMessages, messages.length);
+
+        // Create a synchronous consumer on the remote broker
+        StubConnection connection2 = createRemoteConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        ActiveMQDestination destinationInfo2 =
+            createDestinationInfo(connection2, connectionInfo2, ActiveMQDestination.QUEUE_TYPE);
+        final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
+        connection2.send(consumerInfo2);
+
+        // Consume 5 of the messages from the remote broker and ack them.
+        for (int i = 0; i < receiveNumMessages; ++i) {
+            Message message1 = receiveMessage(connection2, 20000);
+            assertNotNull(message1);
+            LOG.info("on remote, got: " + message1.getMessageId());
+            connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
+            assertTrue("JMSActiveMQBrokerPath property present and correct",
+                    ((ActiveMQMessage)message1).getStringProperty(ActiveMQMessage.BROKER_PATH_PROPERTY).contains(localBroker.getBroker().getBrokerId().toString()));
+        }
+
+        // Ensure that there are zero messages on the local broker. This tells
+        // us that those messages have been prefetched to the remote broker
+        // where the demand exists.
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] result = browseQueueWithJmx(localBroker);
+               return 0 == result.length;
+            }
+        });
+        messages = browseQueueWithJmx(localBroker);
+        assertEquals(0, messages.length);
+
+        // try and pull the messages from remote, should be denied b/c on networkTtl
+        LOG.info("creating demand on second remote...");
+        StubConnection connection3 = createSecondRemoteConnection();
+        ConnectionInfo connectionInfo3 = createConnectionInfo();
+        SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3);
+        connection3.send(connectionInfo3);
+        connection3.send(sessionInfo3);
+        ActiveMQDestination destinationInfo3 =
+            createDestinationInfo(connection3, connectionInfo3, ActiveMQDestination.QUEUE_TYPE);
+        final ConsumerInfo consumerInfoS3 = createConsumerInfo(sessionInfo3, destinationInfo3);
+        connection3.send(consumerInfoS3);
+
+        Message messageExceedingTtl = receiveMessage(connection3, 5000);
+        if (messageExceedingTtl != null) {
+            LOG.error("got message on Second remote: " + messageExceedingTtl);
+            connection3.send(createAck(consumerInfoS3, messageExceedingTtl, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
+        }
+
+        LOG.info("Closing consumer on remote");
+        // Close the consumer on the remote broker
+        connection2.send(consumerInfo2.createRemoveCommand());
+        // also close connection etc.. so messages get dropped from the local consumer  q
+        connection2.send(connectionInfo2.createRemoveCommand());
+
+        // There should now be 5 messages stuck on the remote broker
+        assertTrue("correct stuck message count", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] result = browseQueueWithJmx(remoteBroker);
+                return 5 == result.length;
+            }
+        }));
+        messages = browseQueueWithJmx(remoteBroker);
+        assertEquals(5, messages.length);
+
+        assertTrue("can see broker path property",
+                ((String)((CompositeData)messages[1]).get("BrokerPath")).contains(localBroker.getBroker().getBrokerId().toString()));
+
+        LOG.info("Messages now stuck on remote");
+
+        // receive again on the origin broker
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationInfo1);
+        connection1.send(consumerInfo1);
+        LOG.info("create local consumer: " + consumerInfo1);
+
+        Message message1 = receiveMessage(connection1, 20000);
+        assertNotNull("Expect to get a replay as remote consumer is gone", message1);
+        connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
+        LOG.info("acked one message on origin, waiting for all messages to percolate back");
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] result = browseQueueWithJmx(localBroker);
+               return 4 == result.length;
+            }
+        });
+        messages = browseQueueWithJmx(localBroker);
+        assertEquals(4, messages.length);
+
+        LOG.info("checking for messages on remote again");
+        // messages won't migrate back again till consumer closes
+        connection2 = createRemoteConnection();
+        connectionInfo2 = createConnectionInfo();
+        sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2, destinationInfo2);
+        connection2.send(consumerInfo3);
+        message1 = receiveMessage(connection2, 20000);
+        assertNull("Messages have migrated back: " + message1, message1);
+
+        // Consume the last 4 messages from the local broker and ack them just
+        // to clean up the queue.
+        int counter = 1;
+        for (; counter < receiveNumMessages; counter++) {
+            message1 = receiveMessage(connection1);
+            LOG.info("local consume of: " + (message1 != null ? message1.getMessageId() : " null"));
+            connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
+        }
+        // Ensure that 5 messages were received
+        assertEquals(receiveNumMessages, counter);
+
+        // verify all messages consumed
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] result = browseQueueWithJmx(remoteBroker);
+               return 0 == result.length;
+            }
+        });
+        messages = browseQueueWithJmx(remoteBroker);
+        assertEquals(0, messages.length);
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] result = browseQueueWithJmx(localBroker);
+               return 0 == result.length;
+            }
+        });
+        messages = browseQueueWithJmx(localBroker);
+        assertEquals(0, messages.length);
+
+        // Close the consumer on the remote broker
+        connection2.send(consumerInfo3.createRemoveCommand());
+
+        connection1.stop();
+        connection2.stop();
+        connection3.stop();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        localBroker = new BrokerService();
+        localBroker.setBrokerName("localhost");
+        localBroker.setUseJmx(true);
+        localBroker.setPersistenceAdapter(null);
+        localBroker.setPersistent(false);
+        connector = createConnector();
+        localBroker.addConnector(connector);
+        configureBroker(localBroker);
+        localBroker.start();
+        localBroker.waitUntilStarted();
+
+        localBroker.getManagementContext().setConnectorPort(2221);
+
+        brokers.put(localBroker.getBrokerName(), localBroker);
+
+        return localBroker;
+    }
+
+    private void configureBroker(BrokerService broker) {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(0);
+        ConditionalNetworkBridgeFilterFactory filterFactory = new ConditionalNetworkBridgeFilterFactory();
+        filterFactory.setReplayWhenNoConsumers(true);
+        defaultEntry.setNetworkBridgeFilterFactory(filterFactory);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+    }
+
+    protected BrokerService createRemoteBroker() throws Exception {
+        remoteBroker = new BrokerService();
+        remoteBroker.setBrokerName("remotehost");
+        remoteBroker.setUseJmx(true);
+        remoteBroker.setPersistenceAdapter(null);
+        remoteBroker.setPersistent(false);
+        remoteConnector = createRemoteConnector();
+        remoteBroker.addConnector(remoteConnector);
+        configureBroker(remoteBroker);
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+
+        remoteBroker.getManagementContext().setConnectorPort(2222);
+
+        brokers.put(remoteBroker.getBrokerName(), remoteBroker);
+
+        return remoteBroker;
+    }
+
+    protected BrokerService createSecondRemoteBroker() throws Exception {
+        secondRemoteBroker = new BrokerService();
+        secondRemoteBroker.setBrokerName("secondRemotehost");
+        secondRemoteBroker.setUseJmx(false);
+        secondRemoteBroker.setPersistenceAdapter(null);
+        secondRemoteBroker.setPersistent(false);
+        secondRemoteConnector = createSecondRemoteConnector();
+        secondRemoteBroker.addConnector(secondRemoteConnector);
+        configureBroker(secondRemoteBroker);
+        secondRemoteBroker.start();
+        secondRemoteBroker.waitUntilStarted();
+
+        brokers.put(secondRemoteBroker.getBrokerName(), secondRemoteBroker);
+
+        return secondRemoteBroker;
+    }
+
+    protected Transport createTransport() throws Exception {
+        Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
+        return transport;
+    }
+
+    protected Transport createRemoteTransport() throws Exception {
+        Transport transport = TransportFactory.connect(remoteConnector.getServer().getConnectURI());
+        return transport;
+    }
+
+    protected Transport createSecondRemoteTransport() throws Exception {
+        Transport transport = TransportFactory.connect(secondRemoteConnector.getServer().getConnectURI());
+        return transport;
+    }
+
+    protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException {
+        return new TransportConnector(TransportFactory.bind(new URI(getLocalURI())));
+    }
+
+    protected TransportConnector createRemoteConnector() throws Exception, IOException, URISyntaxException {
+        return new TransportConnector(TransportFactory.bind(new URI(getRemoteURI())));
+    }
+
+    protected TransportConnector createSecondRemoteConnector() throws Exception, IOException, URISyntaxException {
+        return new TransportConnector(TransportFactory.bind(new URI(getSecondRemoteURI())));
+    }
+
+    protected String getRemoteURI() {
+        return "vm://remotehost";
+    }
+
+    protected String getSecondRemoteURI() {
+        return "vm://secondRemotehost";
+    }
+
+    protected String getLocalURI() {
+        return "vm://localhost";
+    }
+
+    protected StubConnection createConnection() throws Exception {
+        Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
+        StubConnection connection = new StubConnection(transport);
+        connections.add(connection);
+        return connection;
+    }
+
+    protected StubConnection createRemoteConnection() throws Exception {
+        Transport transport = TransportFactory.connect(remoteConnector.getServer().getConnectURI());
+        StubConnection connection = new StubConnection(transport);
+        connections.add(connection);
+        return connection;
+    }
+
+    protected StubConnection createSecondRemoteConnection() throws Exception {
+        Transport transport = TransportFactory.connect(secondRemoteConnector.getServer().getConnectURI());
+        StubConnection connection = new StubConnection(transport);
+        connections.add(connection);
+        return connection;
+    }
+
+    @SuppressWarnings({ "unchecked", "unused" })
+    private Object[] browseQueueWithJms(BrokerService broker) throws Exception {
+        Object[] messages = null;
+        Connection connection = null;
+        Session session = null;
+
+        try {
+            URI brokerUri = connector.getUri();
+            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri.toString());
+            connection = connectionFactory.createConnection();
+            connection.start();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(queueName);
+            QueueBrowser browser = session.createBrowser(destination);
+            List<Message> list = new ArrayList<Message>();
+            for (Enumeration<Message> enumn = browser.getEnumeration(); enumn.hasMoreElements();) {
+                list.add(enumn.nextElement());
+            }
+            messages = list.toArray();
+        }
+        finally {
+            if (session != null) {
+                session.close();
+            }
+            if (connection != null) {
+                connection.close();
+            }
+        }
+        LOG.info("+Browsed with JMS: " + messages.length);
+
+        return messages;
+    }
+
+    private Object[] browseQueueWithJmx(BrokerService broker) throws Exception {
+        Hashtable<String, String> params = new Hashtable<String, String>();
+        params.put("brokerName", broker.getBrokerName());
+        params.put("type", "Broker");
+        params.put("destinationType", "Queue");
+        params.put("destinationName", queueName);
+        ObjectName queueObjectName = ObjectName.getInstance(amqDomain, params);
+
+        ManagementContext mgmtCtx = broker.getManagementContext();
+        QueueViewMBean queueView = (QueueViewMBean)mgmtCtx.newProxyInstance(queueObjectName, QueueViewMBean.class, true);
+
+        Object[] messages = queueView.browse();
+
+        LOG.info("+Browsed with JMX: " + messages.length);
+
+        return messages;
+    }
+
+    protected ConnectionInfo createConnectionInfo() throws Exception {
+        ConnectionInfo info = new ConnectionInfo();
+        info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+        info.setClientId(info.getConnectionId().getValue());
+        return info;
+    }
+
+    protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+        SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+        return info;
+    }
+
+    protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+        ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+        return info;
+    }
+
+    protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception {
+        ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+        info.setBrowser(false);
+        info.setDestination(destination);
+        info.setPrefetchSize(1000);
+        info.setDispatchAsync(false);
+        return info;
+    }
+
+    protected DestinationInfo createTempDestinationInfo(ConnectionInfo connectionInfo, byte destinationType) {
+        DestinationInfo info = new DestinationInfo();
+        info.setConnectionId(connectionInfo.getConnectionId());
+        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
+        info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId() + ":" + (++tempDestGenerator), destinationType));
+        return info;
+    }
+
+    protected ActiveMQDestination createDestinationInfo(StubConnection connection, ConnectionInfo connectionInfo1, byte destinationType) throws Exception {
+        if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) {
+            DestinationInfo info = createTempDestinationInfo(connectionInfo1, destinationType);
+            connection.send(info);
+            return info.getDestination();
+        } else {
+            return ActiveMQDestination.createDestination(queueName, destinationType);
+        }
+    }
+
+    protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
+        Message message = createMessage(producerInfo, destination);
+        message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
+        return message;
+    }
+
+    protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+        message.setDestination(destination);
+        message.setPersistent(false);
+        try {
+            message.setText("Test Message Payload.");
+        } catch (MessageNotWriteableException e) {
+        }
+        return message;
+    }
+
+    protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) {
+        MessageAck ack = new MessageAck();
+        ack.setAckType(ackType);
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        ack.setDestination(msg.getDestination());
+        ack.setLastMessageId(msg.getMessageId());
+        ack.setMessageCount(count);
+        return ack;
+    }
+
+    public Message receiveMessage(StubConnection connection) throws InterruptedException {
+        return receiveMessage(connection, maxWait);
+    }
+
+    public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException {
+        while (true) {
+            Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS);
+
+            if (o == null) {
+                return null;
+            }
+            if (o instanceof MessageDispatch) {
+
+                MessageDispatch dispatch = (MessageDispatch)o;
+                if (dispatch.getMessage() == null) {
+                    return null;
+                }
+                dispatch.setMessage(dispatch.getMessage().copy());
+                dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
+                return dispatch.getMessage();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java
new file mode 100644
index 0000000..68681c6
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.net.ServerSocketFactory;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.nio.NIOTransport;
+import org.apache.activemq.transport.nio.NIOTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+/**
+ *
+ * @author x22koe
+ */
+public class CheckDuplicateMessagesOnDuplexTest {
+
+    private static final Logger log = LoggerFactory.getLogger(CheckDuplicateMessagesOnDuplexTest.class);
+    private BrokerService localBroker;
+    private BrokerService remoteBroker;
+    private ActiveMQConnectionFactory localFactory;
+    private ActiveMQConnectionFactory remoteFactory;
+    private Session localSession;
+    private MessageConsumer consumer;
+    private Session remoteSession;
+    private MessageProducer producer;
+    private Connection remoteConnection;
+    private Connection localConnection;
+    private DebugTransportFilter debugTransportFilter;
+    private boolean useLevelDB = false;
+
+    public CheckDuplicateMessagesOnDuplexTest() {
+    }
+
+    @BeforeClass
+    public static void setUpClass() {
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+    }
+
+    @Before
+    public void setUp() {
+    }
+
+    @After
+    public void tearDown() {
+    }
+
+    @Test
+    public void testConnectionLossBehaviorBeforeAckIsSent() throws Exception {
+        createBrokers();
+        localBroker.deleteAllMessages();
+        remoteBroker.deleteAllMessages();
+        startBrokers();
+        openConnections();
+
+        Thread.sleep(1000);
+        log.info("\n\n==============================================\nsend hello1\n");
+
+        // simulate network failure between REMOTE and LOCAL just before the reception response is sent back to REMOTE
+        debugTransportFilter.closeOnResponse = true;
+
+        producer.send(remoteSession.createTextMessage("hello1"));
+        Message msg = consumer.receive(30000);
+
+        assertNotNull("expected hello1", msg);
+        assertEquals("hello1", ((TextMessage) msg).getText());
+
+        Thread.sleep(1000);
+        log.info("\n\n------------------------------------------\nsend hello2\n");
+
+        producer.send(remoteSession.createTextMessage("hello2"));
+        msg = consumer.receive(30000);
+
+        assertNotNull("expected hello2", msg);
+        assertEquals("hello2", ((TextMessage) msg).getText());
+
+        closeLocalConnection();
+
+        Thread.sleep(1000);
+        log.info("\n\n------------------------------------------\nsend hello3\n");
+
+        openLocalConnection();
+
+        Thread.sleep(1000);
+
+        producer.send(remoteSession.createTextMessage("hello3"));
+        msg = consumer.receive(30000);
+
+        assertNotNull("expected hello3", msg);
+        assertEquals("hello3", ((TextMessage) msg).getText());
+
+        Thread.sleep(1000);
+        log.info("\n\n==============================================\n\n");
+
+        closeConnections();
+        stopBrokers();
+
+        // restart the local broker, which should be empty
+
+        Thread.sleep(1000);
+        log.info("\n\n##############################################\n\n");
+
+        createLocalBroker();
+        startLocalBroker();
+        openLocalConnection();
+
+        // this should not return the "hello1" message
+        msg = consumer.receive(1000);
+
+        closeLocalConnection();
+        stopLocalBroker();
+
+        assertNull(msg);
+    }
+
+    private void createBrokers() throws Exception {
+        createLocalBroker();
+        createRemoteBroker();
+    }
+
+    private void createLocalBroker() throws Exception {
+        localBroker = new BrokerService();
+        localBroker.setBrokerName("LOCAL");
+        localBroker.setUseJmx(true);
+        localBroker.setSchedulePeriodForDestinationPurge(5000);
+        ManagementContext managementContext = new ManagementContext();
+        managementContext.setCreateConnector(false);
+        localBroker.setManagementContext(managementContext);
+        PersistenceAdapter persistenceAdapter = persistanceAdapterFactory("target/local");
+        localBroker.setPersistenceAdapter(persistenceAdapter);
+        List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
+        DebugTransportFactory tf = new DebugTransportFactory();
+        TransportServer transport = tf.doBind(URI.create("nio://127.0.0.1:23539"));
+        TransportConnector transportConnector = new TransportConnector(transport);
+        transportConnector.setName("tc");
+        transportConnector.setAuditNetworkProducers(true);
+        transportConnectors.add(transportConnector);
+        localBroker.setTransportConnectors(transportConnectors);
+    }
+
+    private void createRemoteBroker() throws Exception {
+        remoteBroker = new BrokerService();
+        remoteBroker.setBrokerName("REMOTE");
+        remoteBroker.setUseJmx(true);
+        remoteBroker.setSchedulePeriodForDestinationPurge(5000);
+        ManagementContext managementContext = new ManagementContext();
+        managementContext.setCreateConnector(false);
+        remoteBroker.setManagementContext(managementContext);
+        PersistenceAdapter persistenceAdapter = persistanceAdapterFactory("target/remote");
+        remoteBroker.setPersistenceAdapter(persistenceAdapter);
+        List<NetworkConnector> networkConnectors = new ArrayList<NetworkConnector>();
+        DiscoveryNetworkConnector networkConnector = new DiscoveryNetworkConnector();
+        networkConnector.setName("to local");
+        // set maxInactivityDuration to 0, otherwise the broker restarts while you are in the debugger
+        networkConnector.setUri(URI.create("static://(tcp://127.0.0.1:23539?wireFormat.maxInactivityDuration=0)"));
+        networkConnector.setDuplex(true);
+        //networkConnector.setNetworkTTL(5);
+        //networkConnector.setDynamicOnly(true);
+        networkConnector.setAlwaysSyncSend(true);
+        networkConnector.setDecreaseNetworkConsumerPriority(false);
+        networkConnector.setPrefetchSize(1);
+        networkConnector.setCheckDuplicateMessagesOnDuplex(true);
+        networkConnectors.add(networkConnector);
+        remoteBroker.setNetworkConnectors(networkConnectors);
+    }
+
+    private void startBrokers() throws Exception {
+        startLocalBroker();
+        startRemoteBroker();
+    }
+
+    private void startLocalBroker() throws Exception {
+        localBroker.start();
+        localBroker.waitUntilStarted();
+    }
+
+    private void startRemoteBroker() throws Exception {
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+    }
+
+    private void openConnections() throws JMSException {
+        openLocalConnection();
+        openRemoteConnection();
+    }
+
+    private void openLocalConnection() throws JMSException {
+        localFactory = new ActiveMQConnectionFactory(localBroker.getVmConnectorURI());
+        //localFactory.setSendAcksAsync(false);
+        localConnection = localFactory.createConnection();
+        localConnection.start();
+        localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = localSession.createConsumer(localSession.createQueue("testqueue"));
+    }
+
+    private void openRemoteConnection() throws JMSException {
+        remoteFactory = new ActiveMQConnectionFactory(remoteBroker.getVmConnectorURI());
+        //remoteFactory.setSendAcksAsync(false);
+        remoteConnection = remoteFactory.createConnection();
+        remoteConnection.start();
+        remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = remoteSession.createProducer(remoteSession.createQueue("testqueue"));
+    }
+
+    private void closeConnections() throws JMSException {
+        closeLocalConnection();
+        closeRemoteConnection();
+    }
+
+    private void closeLocalConnection() throws JMSException {
+        localConnection.close();
+    }
+
+    private void closeRemoteConnection() throws JMSException {
+        remoteConnection.close();
+    }
+
+    private void stopBrokers() throws Exception {
+        stopRemoteBroker();
+        stopLocalBroker();
+    }
+
+    private void stopLocalBroker() throws Exception {
+        localBroker.stop();
+        localBroker.waitUntilStopped();
+    }
+
+    private void stopRemoteBroker() throws Exception {
+        remoteBroker.stop();
+        remoteBroker.waitUntilStopped();
+    }
+
+    private PersistenceAdapter persistanceAdapterFactory(String path) {
+        if (useLevelDB) {
+            return persistanceAdapterFactory_LevelDB(path);
+        } else {
+            return persistanceAdapterFactory_KahaDB(path);
+        }
+    }
+
+    private PersistenceAdapter persistanceAdapterFactory_KahaDB(String path) {
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
+        kahaDBPersistenceAdapter.setDirectory(new File(path));
+        kahaDBPersistenceAdapter.setIgnoreMissingJournalfiles(true);
+        kahaDBPersistenceAdapter.setCheckForCorruptJournalFiles(true);
+        kahaDBPersistenceAdapter.setChecksumJournalFiles(true);
+        return kahaDBPersistenceAdapter;
+    }
+
+    private PersistenceAdapter persistanceAdapterFactory_LevelDB(String path) {
+        LevelDBPersistenceAdapter levelDBPersistenceAdapter = new LevelDBPersistenceAdapter();
+        levelDBPersistenceAdapter.setDirectory(new File(path));
+        return levelDBPersistenceAdapter;
+    }
+
+    private class DebugTransportFactory extends NIOTransportFactory {
+
+        @Override
+        protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory)
+                throws IOException, URISyntaxException {
+            return new DebugTransportServer(this, location, serverSocketFactory);
+        }
+    }
+
+    private class DebugTransportServer extends TcpTransportServer {
+
+        public DebugTransportServer(TcpTransportFactory transportFactory, URI location,
+                                    ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+            super(transportFactory, location, serverSocketFactory);
+        }
+
+        @Override
+        protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+            Transport transport;
+            transport = new NIOTransport(format, socket);
+            debugTransportFilter = new DebugTransportFilter(transport);
+            return debugTransportFilter;
+        }
+    }
+
+    private class DebugTransportFilter extends TransportFilter {
+
+        boolean closeOnResponse = false;
+
+        public DebugTransportFilter(Transport next) {
+            super(next);
+        }
+
+        @Override
+        public void oneway(Object command) throws IOException {
+            if (closeOnResponse && command instanceof Response) {
+                closeOnResponse = false;
+                log.warn("\n\nclosing connection before response is sent\n\n");
+                try {
+                    ((NIOTransport) next).stop();
+                } catch (Exception ex) {
+                    log.error("couldn't stop niotransport", ex);
+                }
+                // don't send response
+                return;
+            }
+            super.oneway(command);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java
new file mode 100644
index 0000000..58af6dc
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+
+public class CompressionOverNetworkTest {
+
+    protected static final int RECEIVE_TIMEOUT_MILLS = 10000;
+    protected static final int MESSAGE_COUNT = 10;
+    private static final Logger LOG = LoggerFactory.getLogger(CompressionOverNetworkTest.class);
+
+    protected AbstractApplicationContext context;
+    protected Connection localConnection;
+    protected Connection remoteConnection;
+    protected BrokerService localBroker;
+    protected BrokerService remoteBroker;
+    protected Session localSession;
+    protected Session remoteSession;
+    protected ActiveMQDestination included;
+
+    @Test
+    public void testCompressedOverCompressedNetwork() throws Exception {
+
+        ActiveMQConnection localAmqConnection = (ActiveMQConnection) localConnection;
+        localAmqConnection.setUseCompression(true);
+
+        MessageConsumer consumer1 = remoteSession.createConsumer(included);
+        MessageProducer producer = localSession.createProducer(included);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        StringBuilder payload = new StringBuilder("test-");
+        for (int i = 0; i < 100; ++i) {
+            payload.append(UUID.randomUUID().toString());
+        }
+
+        Message test = localSession.createTextMessage(payload.toString());
+        producer.send(test);
+        Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
+        assertNotNull(msg);
+        ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
+        assertTrue(message.isCompressed());
+        assertEquals(payload.toString(), message.getText());
+    }
+
+    @Test
+    public void testTextMessageCompression() throws Exception {
+
+        MessageConsumer consumer1 = remoteSession.createConsumer(included);
+        MessageProducer producer = localSession.createProducer(included);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        StringBuilder payload = new StringBuilder("test-");
+        for (int i = 0; i < 100; ++i) {
+            payload.append(UUID.randomUUID().toString());
+        }
+
+        Message test = localSession.createTextMessage(payload.toString());
+        producer.send(test);
+        Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
+        assertNotNull(msg);
+        ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
+        assertTrue(message.isCompressed());
+        assertEquals(payload.toString(), message.getText());
+    }
+
+    @Test
+    public void testBytesMessageCompression() throws Exception {
+
+        MessageConsumer consumer1 = remoteSession.createConsumer(included);
+        MessageProducer producer = localSession.createProducer(included);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        StringBuilder payload = new StringBuilder("test-");
+        for (int i = 0; i < 100; ++i) {
+            payload.append(UUID.randomUUID().toString());
+        }
+
+        byte[] bytes = payload.toString().getBytes("UTF-8");
+
+        BytesMessage test = localSession.createBytesMessage();
+        test.writeBytes(bytes);
+        producer.send(test);
+        Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
+        assertNotNull(msg);
+        ActiveMQBytesMessage message = (ActiveMQBytesMessage) msg;
+        assertTrue(message.isCompressed());
+        assertTrue(message.getContent().getLength() < bytes.length);
+
+        byte[] result = new byte[bytes.length];
+        assertEquals(bytes.length, message.readBytes(result));
+        assertEquals(-1, message.readBytes(result));
+
+        for(int i = 0; i < bytes.length; ++i) {
+            assertEquals(bytes[i], result[i]);
+        }
+    }
+
+    @Test
+    public void testStreamMessageCompression() throws Exception {
+
+        MessageConsumer consumer1 = remoteSession.createConsumer(included);
+        MessageProducer producer = localSession.createProducer(included);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        StreamMessage test = localSession.createStreamMessage();
+
+        for (int i = 0; i < 100; ++i) {
+            test.writeString("test string: " + i);
+        }
+
+        producer.send(test);
+        Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
+        assertNotNull(msg);
+        ActiveMQStreamMessage message = (ActiveMQStreamMessage) msg;
+        assertTrue(message.isCompressed());
+
+        for (int i = 0; i < 100; ++i) {
+            assertEquals("test string: " + i, message.readString());
+        }
+    }
+
+    @Test
+    public void testMapMessageCompression() throws Exception {
+
+        MessageConsumer consumer1 = remoteSession.createConsumer(included);
+        MessageProducer producer = localSession.createProducer(included);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        MapMessage test = localSession.createMapMessage();
+
+        for (int i = 0; i < 100; ++i) {
+            test.setString(Integer.toString(i), "test string: " + i);
+        }
+
+        producer.send(test);
+        Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
+        assertNotNull(msg);
+        ActiveMQMapMessage message = (ActiveMQMapMessage) msg;
+        assertTrue(message.isCompressed());
+
+        for (int i = 0; i < 100; ++i) {
+            assertEquals("test string: " + i, message.getString(Integer.toString(i)));
+        }
+    }
+
+    @Test
+    public void testObjectMessageCompression() throws Exception {
+
+        MessageConsumer consumer1 = remoteSession.createConsumer(included);
+        MessageProducer producer = localSession.createProducer(included);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        StringBuilder payload = new StringBuilder("test-");
+        for (int i = 0; i < 100; ++i) {
+            payload.append(UUID.randomUUID().toString());
+        }
+
+        Message test = localSession.createObjectMessage(payload.toString());
+        producer.send(test);
+        Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
+        assertNotNull(msg);
+        ActiveMQObjectMessage message = (ActiveMQObjectMessage) msg;
+        assertTrue(message.isCompressed());
+        assertEquals(payload.toString(), message.getObject());
+    }
+
+    private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
+        assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] bridges = brokerService.getNetworkConnectors().get(0).bridges.values().toArray();
+                if (bridges.length > 0) {
+                    LOG.info(brokerService + " bridges "  + Arrays.toString(bridges));
+                    DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0];
+                    ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
+                    LOG.info(brokerService + " bridge "  + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
+                    if (!forwardingBridges.isEmpty()) {
+                        for (DemandSubscription demandSubscription : forwardingBridges.values()) {
+                            if (demandSubscription.getLocalInfo().getDestination().equals(destination)) {
+                                LOG.info(brokerService + " DemandSubscription "  + demandSubscription + ", size: " + demandSubscription.size());
+                                return demandSubscription.size() >= min;
+                            }
+                        }
+                    }
+                }
+                return false;
+            }
+        }));
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        doSetUp(true);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        doTearDown();
+    }
+
+    protected void doTearDown() throws Exception {
+        localConnection.close();
+        remoteConnection.close();
+        localBroker.stop();
+        remoteBroker.stop();
+    }
+
+    protected void doSetUp(boolean deleteAllMessages) throws Exception {
+        localBroker = createLocalBroker();
+        localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        localBroker.start();
+        localBroker.waitUntilStarted();
+        remoteBroker = createRemoteBroker();
+        remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+        URI localURI = localBroker.getVmConnectorURI();
+        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
+        fac.setAlwaysSyncSend(true);
+        fac.setDispatchAsync(false);
+        localConnection = fac.createConnection();
+        localConnection.setClientID("clientId");
+        localConnection.start();
+        URI remoteURI = remoteBroker.getVmConnectorURI();
+        fac = new ActiveMQConnectionFactory(remoteURI);
+        remoteConnection = fac.createConnection();
+        remoteConnection.setClientID("clientId");
+        remoteConnection.start();
+        included = new ActiveMQTopic("include.test.bar");
+        localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    protected String getRemoteBrokerURI() {
+        return "org/apache/activemq/network/remoteBroker.xml";
+    }
+
+    protected String getLocalBrokerURI() {
+        return "org/apache/activemq/network/localBroker.xml";
+    }
+
+    protected BrokerService createBroker(String uri) throws Exception {
+        Resource resource = new ClassPathResource(uri);
+        BrokerFactoryBean factory = new BrokerFactoryBean(resource);
+        resource = new ClassPathResource(uri);
+        factory = new BrokerFactoryBean(resource);
+        factory.afterPropertiesSet();
+        BrokerService result = factory.getBroker();
+
+        for (NetworkConnector connector : result.getNetworkConnectors()) {
+            connector.setUseCompression(true);
+        }
+
+        return result;
+    }
+
+    protected BrokerService createLocalBroker() throws Exception {
+        return createBroker(getLocalBrokerURI());
+    }
+
+    protected BrokerService createRemoteBroker() throws Exception {
+        return createBroker(getRemoteBrokerURI());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
new file mode 100644
index 0000000..087ddd0
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+import java.util.Arrays;
+
+
+public class DemandForwardingBridgeFilterTest extends NetworkTestSupport {
+
+    private DemandForwardingBridge bridge;
+
+    private StubConnection producerConnection;
+
+    private ProducerInfo producerInfo;
+
+    private StubConnection consumerConnection;
+
+    private SessionInfo consumerSessionInfo;
+
+    public void testWildcardOnExcludedDestination() throws Exception {
+
+        NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
+
+        configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>",
+                ActiveMQDestination.TOPIC_TYPE)));
+        configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
+                "TEST", ActiveMQDestination.QUEUE_TYPE)));
+
+        configureAndStartBridge(configuration);
+
+        assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE);
+    }
+
+    public void testWildcardOnTwoExcludedDestination() throws Exception {
+        NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
+
+        configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE),
+                ActiveMQDestination.createDestination("TEST.X1", ActiveMQDestination.QUEUE_TYPE)));
+        configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
+                "TEST.X2", ActiveMQDestination.QUEUE_TYPE)));
+
+        configureAndStartBridge(configuration);
+
+        assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE);
+    }
+
+
+    public void testWildcardOnDynamicallyIncludedDestination() throws Exception {
+
+        NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
+
+        configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE),
+                ActiveMQDestination.createDestination("TEST.X2", ActiveMQDestination.QUEUE_TYPE)));
+
+        configureAndStartBridge(configuration);
+
+
+        assertReceiveMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE);
+    }
+
+    public void testDistinctTopicAndQueue() throws Exception {
+
+        NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
+
+        configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(">",
+                ActiveMQDestination.TOPIC_TYPE)));
+        configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
+                ">", ActiveMQDestination.QUEUE_TYPE)));
+
+        configureAndStartBridge(configuration);
+
+        assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("TEST", ActiveMQDestination.TOPIC_TYPE);
+    }
+
+    public void testListOfExcludedDestinationWithWildcard() throws Exception {
+
+        NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
+
+        configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.TOPIC_TYPE),
+                ActiveMQDestination.createDestination("TEST.*", ActiveMQDestination.TOPIC_TYPE)));
+        configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
+                "TEST.X1", ActiveMQDestination.QUEUE_TYPE)));
+
+        configureAndStartBridge(configuration);
+
+        assertReceiveMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE);
+        assertReceiveNoMessageOn("OTHER.T2", ActiveMQDestination.TOPIC_TYPE);
+    }
+
+    private void assertReceiveMessageOn(String destinationName, byte destinationType) throws Exception,
+            InterruptedException {
+
+        ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, destinationType);
+
+        // Send the message to the local broker.
+        producerConnection.send(createMessage(producerInfo, destination, destinationType));
+
+        // Make sure the message was delivered via the remote.
+        Message m = createConsumerAndReceiveMessage(destination);
+
+        assertNotNull(m);
+    }
+
+    private void assertReceiveNoMessageOn(String destinationName, byte destinationType) throws Exception,
+            InterruptedException {
+
+        ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, destinationType);
+
+        // Send the message to the local broker.
+        producerConnection.send(createMessage(producerInfo, destination, destinationType));
+
+        // Make sure the message was delivered via the remote.
+        Message m = createConsumerAndReceiveMessage(destination);
+        assertNull(m);
+    }
+
+    private Message createConsumerAndReceiveMessage(ActiveMQDestination destination) throws Exception {
+        // Now create remote consumer that should cause message to move to this
+        // remote consumer.
+        ConsumerInfo consumerInfo = createConsumerInfo(consumerSessionInfo, destination);
+        consumerConnection.send(consumerInfo);
+
+        Message m = receiveMessage(consumerConnection);
+        return m;
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+
+        producerConnection = createConnection();
+        ConnectionInfo producerConnectionInfo = createConnectionInfo();
+        SessionInfo producerSessionInfo = createSessionInfo(producerConnectionInfo);
+        producerInfo = createProducerInfo(producerSessionInfo);
+        producerConnection.send(producerConnectionInfo);
+        producerConnection.send(producerSessionInfo);
+        producerConnection.send(producerInfo);
+
+        consumerConnection = createRemoteConnection();
+        ConnectionInfo consumerConnectionInfo = createConnectionInfo();
+        consumerSessionInfo = createSessionInfo(consumerConnectionInfo);
+        consumerConnection.send(consumerConnectionInfo);
+        consumerConnection.send(consumerSessionInfo);
+    }
+
+    protected void tearDown() throws Exception {
+        bridge.stop();
+        super.tearDown();
+    }
+
+    public static Test suite() {
+        return suite(DemandForwardingBridgeFilterTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    public NetworkBridgeConfiguration getDefaultBridgeConfiguration() {
+        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+        config.setBrokerName("local");
+        config.setDispatchAsync(false);
+        return config;
+    }
+
+    private void configureAndStartBridge(NetworkBridgeConfiguration configuration) throws Exception {
+        bridge = new DemandForwardingBridge(configuration, createTransport(), createRemoteTransport());
+        bridge.setBrokerService(broker);
+        bridge.setDynamicallyIncludedDestinations(configuration.getDynamicallyIncludedDestinations().toArray(
+                new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]
+        ));
+        bridge.setExcludedDestinations(configuration.getExcludedDestinations().toArray(
+                new ActiveMQDestination[configuration.getExcludedDestinations().size()]
+        ));
+        bridge.setStaticallyIncludedDestinations(configuration.getStaticallyIncludedDestinations().toArray(
+                new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]
+        ));
+        bridge.start();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
new file mode 100644
index 0000000..9794337
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import javax.jms.DeliveryMode;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.util.Wait;
+
+public class DemandForwardingBridgeTest extends NetworkTestSupport {
+
+    public ActiveMQDestination destination;
+    public byte destinationType;
+    public int deliveryMode;
+    private DemandForwardingBridge bridge;
+
+    public void initCombosForTestSendThenAddConsumer() {
+        addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT), new Integer(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {new Byte(ActiveMQDestination.QUEUE_TYPE)});
+    }
+
+    public void testSendThenAddConsumer() throws Exception {
+
+        // Start a producer on local broker
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo);
+
+        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+        // Start a consumer on a remote broker
+        final StubConnection connection2 = createRemoteConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+
+        // Send the message to the local broker.
+        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+
+        // Verify that the message stayed on the local broker.
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        connection1.send(consumerInfo1);
+        Message m = receiveMessage(connection1);
+        assertNotNull(m);
+        // Close consumer to cause the message to rollback.
+        connection1.send(consumerInfo1.createRemoveCommand());
+
+        final DestinationStatistics destinationStatistics = broker.getDestination(destination).getDestinationStatistics();
+        assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount());
+        assertEquals("broker dest stat dequeues", 0, destinationStatistics.getDequeues().getCount());
+        assertEquals("broker dest stat forwards", 0, destinationStatistics.getForwards().getCount());
+
+        // Now create remote consumer that should cause message to move to this
+        // remote consumer.
+        final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        connection2.request(consumerInfo2);
+
+        // Make sure the message was delivered via the remote.
+        assertTrue("message was received", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message msg = receiveMessage(connection2);
+                if (msg != null) {
+                    connection2.request(createAck(consumerInfo2, msg, 1, MessageAck.STANDARD_ACK_TYPE));
+                    return true;
+                }
+
+                return false;
+            }
+        }));
+
+        assertTrue("broker dest stat forwards", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 1 == destinationStatistics.getForwards().getCount();
+            }
+        }));
+
+        assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount());
+    }
+
+    public void initCombosForTestAddConsumerThenSend() {
+        addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT), new Integer(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {new Byte(ActiveMQDestination.QUEUE_TYPE), new Byte(ActiveMQDestination.TOPIC_TYPE)});
+    }
+
+    public void testAddConsumerThenSend() throws Exception {
+
+        // Start a producer on local broker
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo);
+
+        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+        // Start a consumer on a remote broker
+        StubConnection connection2 = createRemoteConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination);
+        connection2.send(consumerInfo);
+
+        // Give demand forwarding bridge a chance to finish forwarding the
+        // subscriptions.
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+            ie.printStackTrace();
+        }
+
+        // Send the message to the local boker.
+        connection1.request(createMessage(producerInfo, destination, deliveryMode));
+        // Make sure the message was delivered via the remote.
+        receiveMessage(connection2);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+        config.setBrokerName("local");
+        config.setDispatchAsync(false);
+        bridge = new DemandForwardingBridge(config, createTransport(), createRemoteTransport());
+        bridge.setBrokerService(broker);
+        bridge.start();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        bridge.stop();
+        super.tearDown();
+    }
+
+    public static Test suite() {
+        return suite(DemandForwardingBridgeTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}


Mime
View raw message