activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/4] activemq-artemis git commit: ARTEMIS-204 Improvements on OpenWire
Date Fri, 14 Aug 2015 00:42:14 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index e7582f2..ada2a70 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -40,9 +40,23 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.BrokerInfo;
@@ -77,20 +91,7 @@ import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction;
-import org.apache.activemq.artemis.core.remoting.CloseListener;
-import org.apache.activemq.artemis.core.remoting.FailureListener;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.state.ConsumerState;
@@ -100,7 +101,6 @@ import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.TransmitCallback;
 import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.wireformat.WireFormat;
 
 /**
@@ -176,8 +176,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
{
 
    private final Set<String> tempQueues = new ConcurrentHashSet<String>();
 
-   private DataInputWrapper dataInput = new DataInputWrapper();
-
    private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId,
TransactionInfo>();
 
    private volatile AMQSession advisorySession;
@@ -196,96 +194,78 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
{
    @Override
    public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
       try {
-         dataInput.receiveData(buffer);
-      }
-      catch (Throwable t) {
-         ActiveMQServerLogger.LOGGER.error("decoding error", t);
-         return;
-      }
-
-      // this.setDataReceived();
-      while (dataInput.readable()) {
-         try {
-            Object object = null;
-            try {
-               object = wireFormat.unmarshal(dataInput);
-               dataInput.mark();
-            }
-            catch (NotEnoughBytesException e) {
-               //meaning the dataInput hasn't enough bytes for a command.
-               //in that case we just return and waiting for the next
-               //call of bufferReceived()
-               return;
+         Object object = wireFormat.unmarshal(buffer);
+
+         Command command = (Command) object;
+         boolean responseRequired = command.isResponseRequired();
+         int commandId = command.getCommandId();
+         // the connection handles pings, negotiations directly.
+         // and delegate all other commands to manager.
+         if (command.getClass() == KeepAliveInfo.class) {
+            KeepAliveInfo info = (KeepAliveInfo) command;
+            if (info.isResponseRequired()) {
+               info.setResponseRequired(false);
+               protocolManager.sendReply(this, info);
             }
+         }
+         else if (command.getClass() == WireFormatInfo.class) {
+            // amq here starts a read/write monitor thread (detect ttl?)
+            negotiate((WireFormatInfo) command);
+         }
+         else if (command.getClass() == ConnectionInfo.class || command.getClass() == ConsumerInfo.class
|| command.getClass() == RemoveInfo.class ||
+                  command.getClass() == SessionInfo.class || command.getClass() == ProducerInfo.class
|| ActiveMQMessage.class.isAssignableFrom(command.getClass()) ||
+                  command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class
|| command.getClass() == DestinationInfo.class ||
+                  command.getClass() == ShutdownInfo.class || command.getClass() == RemoveSubscriptionInfo.class)
{
+            Response response = null;
 
-            Command command = (Command) object;
-            boolean responseRequired = command.isResponseRequired();
-            int commandId = command.getCommandId();
-            // the connection handles pings, negotiations directly.
-            // and delegate all other commands to manager.
-            if (command.getClass() == KeepAliveInfo.class) {
-               KeepAliveInfo info = (KeepAliveInfo) command;
-               if (info.isResponseRequired()) {
-                  info.setResponseRequired(false);
-                  protocolManager.sendReply(this, info);
-               }
-            }
-            else if (command.getClass() == WireFormatInfo.class) {
-               // amq here starts a read/write monitor thread (detect ttl?)
-               negotiate((WireFormatInfo) command);
+            if (pendingStop) {
+               response = new ExceptionResponse(this.stopError);
             }
-            else if (command.getClass() == ConnectionInfo.class || command.getClass() ==
ConsumerInfo.class || command.getClass() == RemoveInfo.class || command.getClass() == SessionInfo.class
|| command.getClass() == ProducerInfo.class || ActiveMQMessage.class.isAssignableFrom(command.getClass())
|| command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class ||
command.getClass() == DestinationInfo.class || command.getClass() == ShutdownInfo.class ||
command.getClass() == RemoveSubscriptionInfo.class) {
-               Response response = null;
-
-               if (pendingStop) {
-                  response = new ExceptionResponse(this.stopError);
-               }
-               else {
-                  response = ((Command) command).visit(this);
-
-                  if (response instanceof ExceptionResponse) {
-                     if (!responseRequired) {
-                        Throwable cause = ((ExceptionResponse) response).getException();
-                        serviceException(cause);
-                        response = null;
-                     }
-                  }
-               }
-
-               if (responseRequired) {
-                  if (response == null) {
-                     response = new Response();
-                  }
-               }
+            else {
+               response = ((Command) command).visit(this);
 
-               // The context may have been flagged so that the response is not
-               // sent.
-               if (context != null) {
-                  if (context.isDontSendReponse()) {
-                     context.setDontSendReponse(false);
+               if (response instanceof ExceptionResponse) {
+                  if (!responseRequired) {
+                     Throwable cause = ((ExceptionResponse) response).getException();
+                     serviceException(cause);
                      response = null;
                   }
                }
+            }
 
-               if (response != null && !protocolManager.isStopping()) {
-                  response.setCorrelationId(commandId);
-                  dispatchSync(response);
+            if (responseRequired) {
+               if (response == null) {
+                  response = new Response();
                }
+            }
 
+            // The context may have been flagged so that the response is not
+            // sent.
+            if (context != null) {
+               if (context.isDontSendReponse()) {
+                  context.setDontSendReponse(false);
+                  response = null;
+               }
             }
-            else {
-               // note!!! wait for negotiation (e.g. use a countdown latch)
-               // before handling any other commands
-               this.protocolManager.handleCommand(this, command);
+
+            if (response != null && !protocolManager.isStopping()) {
+               response.setCorrelationId(commandId);
+               dispatchSync(response);
             }
+
          }
-         catch (IOException e) {
-            ActiveMQServerLogger.LOGGER.error("error decoding", e);
-         }
-         catch (Throwable t) {
-            ActiveMQServerLogger.LOGGER.error("error decoding", t);
+         else {
+            // note!!! wait for negotiation (e.g. use a countdown latch)
+            // before handling any other commands
+            this.protocolManager.handleCommand(this, command);
          }
       }
+      catch (IOException e) {
+         ActiveMQServerLogger.LOGGER.error("error decoding", e);
+      }
+      catch (Throwable t) {
+         ActiveMQServerLogger.LOGGER.error("error decoding", t);
+      }
    }
 
    private void negotiate(WireFormatInfo command) throws IOException {
@@ -624,6 +604,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
{
 
    public void serviceExceptionAsync(final IOException e) {
       if (asyncException.compareAndSet(false, true)) {
+         // Why this is not through an executor?
          new Thread("Async Exception Handler") {
             @Override
             public void run() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 260ee02..90518ec 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire;
 
+import javax.jms.JMSException;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -23,13 +24,19 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
-
-import javax.jms.JMSException;
+import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.TypedProperties;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMapMessage;
@@ -51,14 +58,6 @@ import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtbuf.UTF8Buffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.TypedProperties;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
 
 public class OpenWireMessageConverter implements MessageConverter {
 
@@ -429,7 +428,7 @@ public class OpenWireMessageConverter implements MessageConverter {
       }
       amqMsg.setBrokerInTime(brokerInTime);
 
-      ActiveMQBuffer buffer = coreMessage.getBodyBuffer();
+      ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy();
       if (buffer != null) {
          buffer.resetReaderIndex();
          byte[] bytes = null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 10d67a1..8c20c46 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -31,18 +31,38 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledExecutorService;
 
 import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
-import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
+import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -66,26 +86,8 @@ import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
-import org.apache.activemq.artemis.core.security.CheckType;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.openwire.OpenWireFormatFactory;
-import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
-import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
-import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
 import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
@@ -183,8 +185,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
No
 
    @Override
    public void addChannelHandlers(ChannelPipeline pipeline) {
-      // TODO Auto-generated method stub
-
+      // each read will have a full packet with this
+      pipeline.addLast("packet-decipher", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
0, DataConstants.SIZE_INT));
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 741c32b..12ddb94 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -275,12 +275,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
          return HandleStatus.BUSY;
       }
 
-      // TODO - https://jira.jboss.org/browse/HORNETQ-533
-      // if (!writeReady.get())
-      // {
-      // return HandleStatus.BUSY;
-      // }
-
       synchronized (lock) {
          // If the consumer is stopped then we don't accept the message, it
          // should go back into the

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index 83c94ee..faa947e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -27,18 +27,18 @@ import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.util.concurrent.TimeUnit;
-
 public class SimpleOpenWireTest extends BasicOpenWireTest {
 
    @Rule
@@ -300,6 +300,42 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
       }
    }
 
+   @Test
+   public void testFailoverTransportReconnect() throws Exception {
+      Connection exConn = null;
+
+      try {
+         String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + ")";
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
+
+         Queue queue = new ActiveMQQueue(durableQueueName);
+
+         exConn = exFact.createConnection();
+         exConn.start();
+
+         Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer messageProducer = session.createProducer(queue);
+         messageProducer.send(session.createTextMessage("Test"));
+
+         MessageConsumer consumer = session.createConsumer(queue);
+         assertNotNull(consumer.receive(5000));
+
+         server.stop();
+         Thread.sleep(3000);
+
+         server.start();
+         server.waitForActivation(10, TimeUnit.SECONDS);
+
+         messageProducer.send(session.createTextMessage("Test2"));
+         assertNotNull(consumer.receive(5000));
+      }
+      finally {
+         if (exConn != null) {
+            exConn.close();
+         }
+      }
+   }
+
    /**
     * This is the example shipped with the distribution
     *
@@ -309,41 +345,30 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
    public void testOpenWireExample() throws Exception {
       Connection exConn = null;
 
+      SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+      this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
       try {
-         String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
-         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
 
-         // Step 2. Perfom a lookup on the queue
          Queue queue = new ActiveMQQueue(durableQueueName);
 
-         // Step 4.Create a JMS Connection
          exConn = exFact.createConnection();
 
-         // Step 10. Start the Connection
          exConn.start();
 
-         // Step 5. Create a JMS Session
          Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-         // Step 6. Create a JMS Message Producer
          MessageProducer producer = session.createProducer(queue);
 
-         // Step 7. Create a Text Message
          TextMessage message = session.createTextMessage("This is a text message");
 
-         //System.out.println("Sent message: " + message.getText());
-
-         // Step 8. Send the Message
          producer.send(message);
 
-         // Step 9. Create a JMS Message Consumer
          MessageConsumer messageConsumer = session.createConsumer(queue);
 
-         // Step 11. Receive the message
          TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
 
-         System.out.println("Received message: " + messageReceived);
-
          assertEquals("This is a text message", messageReceived.getText());
       }
       finally {
@@ -354,39 +379,88 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
 
    }
 
+
+   /**
+    * This is the example shipped with the distribution
+    *
+    * @throws Exception
+    */
    @Test
-   public void testFailoverTransportReconnect() throws Exception {
+   public void testMultipleConsumers() throws Exception {
       Connection exConn = null;
 
+      SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+      this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
       try {
-         String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + ")";
-         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
 
          Queue queue = new ActiveMQQueue(durableQueueName);
 
          exConn = exFact.createConnection();
+
          exConn.start();
 
          Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer messageProducer = session.createProducer(queue);
-         messageProducer.send(session.createTextMessage("Test"));
 
-         MessageConsumer consumer = session.createConsumer(queue);
-         assertNotNull(consumer.receive(5000));
+         MessageProducer producer = session.createProducer(queue);
 
-         server.stop();
-         Thread.sleep(3000);
+         TextMessage message = session.createTextMessage("This is a text message");
 
-         server.start();
-         server.waitForActivation(10, TimeUnit.SECONDS);
+         producer.send(message);
 
-         messageProducer.send(session.createTextMessage("Test2"));
-         assertNotNull(consumer.receive(5000));
+         MessageConsumer messageConsumer = session.createConsumer(queue);
+
+         TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+         assertEquals("This is a text message", messageReceived.getText());
       }
       finally {
          if (exConn != null) {
             exConn.close();
          }
       }
+
    }
+
+   @Test
+   public void testMixedOpenWireExample() throws Exception {
+      Connection openConn = null;
+
+      SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+      this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
+      ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory();
+
+      Queue queue = new ActiveMQQueue("exampleQueue");
+
+      openConn = openCF.createConnection();
+
+      openConn.start();
+
+      Session openSession = openConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer producer = openSession.createProducer(queue);
+
+      TextMessage message = openSession.createTextMessage("This is a text message");
+
+      producer.send(message);
+
+      org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory artemisCF = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
+
+      Connection artemisConn = artemisCF.createConnection();
+      Session artemisSession = artemisConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      artemisConn.start();
+      MessageConsumer messageConsumer = artemisSession.createConsumer(artemisSession.createQueue("exampleQueue"));
+
+      TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+      assertEquals("This is a text message", messageReceived.getText());
+
+      openConn.close();
+      artemisConn.close();
+
+   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java
new file mode 100644
index 0000000..8d315d3
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Test;
+
+/** This is useful to debug connection ordering. There's only one connection being made from
these tests */
+public class VerySimpleOenwireTest extends OpenWireTestBase {
+
+   /**
+    * This is the example shipped with the distribution
+    *
+    * @throws Exception
+    */
+   @Test
+   public void testOpenWireExample() throws Exception {
+      Connection exConn = null;
+
+      SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+      this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
+      try {
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
+
+         Queue queue = new ActiveMQQueue("exampleQueue");
+
+         exConn = exFact.createConnection();
+
+         exConn.start();
+
+         Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer = session.createProducer(queue);
+
+         TextMessage message = session.createTextMessage("This is a text message");
+
+         producer.send(message);
+
+         MessageConsumer messageConsumer = session.createConsumer(queue);
+
+         TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+         assertEquals("This is a text message", messageReceived.getText());
+      }
+      finally {
+         if (exConn != null) {
+            exConn.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void testMixedOpenWireExample() throws Exception {
+      Connection openConn = null;
+
+      SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+      this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
+      ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory();
+
+      Queue queue = new ActiveMQQueue("exampleQueue");
+
+      openConn = openCF.createConnection();
+
+      openConn.start();
+
+      Session openSession = openConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer producer = openSession.createProducer(queue);
+
+      TextMessage message = openSession.createTextMessage("This is a text message");
+
+      producer.send(message);
+
+      org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory artemisCF = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
+
+      Connection artemisConn = artemisCF.createConnection();
+      Session artemisSession = artemisConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      artemisConn.start();
+      MessageConsumer messageConsumer = artemisSession.createConsumer(artemisSession.createQueue("exampleQueue"));
+
+      TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+      assertEquals("This is a text message", messageReceived.getText());
+
+      openConn.close();
+      artemisConn.close();
+
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
index a770183..321fda7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.artemis.tests.integration.transports.netty;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.embedded.EmbeddedChannel;
@@ -24,10 +28,6 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
 public class ActiveMQFrameDecoder2Test extends ActiveMQTestBase {
 
    private static final int MSG_CNT = 10000;


Mime
View raw message