activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [31/51] [partial] https://issues.apache.org/jira/browse/OPENWIRE-1
Date Thu, 24 Jul 2014 14:23:20 GMT
http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/OpenWireInteropTestSupport.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/OpenWireInteropTestSupport.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/OpenWireInteropTestSupport.java
new file mode 100644
index 0000000..68fe869
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/OpenWireInteropTestSupport.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.JMSException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.openwire.codec.OpenWireFormat;
+import org.apache.activemq.openwire.codec.OpenWireFormatFactory;
+import org.apache.activemq.openwire.commands.BrokerInfo;
+import org.apache.activemq.openwire.commands.Command;
+import org.apache.activemq.openwire.commands.KeepAliveInfo;
+import org.apache.activemq.openwire.commands.Message;
+import org.apache.activemq.openwire.commands.MessageDispatch;
+import org.apache.activemq.openwire.commands.Response;
+import org.apache.activemq.openwire.commands.ShutdownInfo;
+import org.apache.activemq.openwire.commands.WireFormatInfo;
+import org.apache.activemq.openwire.util.TcpTransport;
+import org.apache.activemq.openwire.util.TransportListener;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class used in testing the interoperability between the OpenWire
+ * commands and Marshalers in this library and those in ActiveMQ.
+ */
+public abstract class OpenWireInteropTestSupport implements TransportListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OpenWireInteropTestSupport.class);
+
+    @Rule public TestName name = new TestName();
+
+    protected BrokerService brokerService;
+
+    private TcpTransport transport;
+    protected URI connectionURI;
+
+    private OpenWireFormatFactory factory;
+    protected OpenWireFormat wireFormat;
+
+    private CountDownLatch connected;
+
+    private WireFormatInfo remoteWireformatInfo;
+    private BrokerInfo remoteInfo;
+    private Exception failureCause;
+    private final AtomicInteger requestIdGenerator = new AtomicInteger(1);
+
+    private final Map<Integer, CountDownLatch> requestMap =
+        new ConcurrentHashMap<Integer, CountDownLatch>();
+
+    protected Command latest;
+    protected final Queue<Message> messages = new LinkedList<Message>();
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = createBroker();
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        factory = new OpenWireFormatFactory();
+        factory.setVersion(getOpenWireVersion());
+        factory.setCacheEnabled(false);
+        factory.setTightEncodingEnabled(isTightEncodingEnabled());
+
+        wireFormat = factory.createWireFormat();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        disconnect();
+
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+
+    protected abstract int getOpenWireVersion();
+
+    protected abstract boolean isTightEncodingEnabled();
+
+    protected void connect() throws Exception {
+        connected = new CountDownLatch(1);
+
+        transport = new TcpTransport(wireFormat, connectionURI);
+        transport.setTransportListener(this);
+        transport.start();
+
+        transport.oneway(wireFormat.getPreferedWireFormatInfo());
+    }
+
+    protected void disconnect() throws Exception {
+        if (transport != null && transport.isStarted()) {
+            ShutdownInfo done = new ShutdownInfo();
+            transport.oneway(done);
+            Thread.sleep(50);
+            transport.stop();
+        }
+    }
+
+    protected boolean request(Command command, long timeout, TimeUnit units) throws Exception {
+        command.setCommandId(requestIdGenerator.getAndIncrement());
+        command.setResponseRequired(true);
+        CountDownLatch complete = new CountDownLatch(1);
+        requestMap.put(new Integer(command.getCommandId()), complete);
+        transport.oneway(command);
+        return complete.await(timeout, units);
+    }
+
+    protected boolean awaitConnected(long time, TimeUnit unit) throws InterruptedException {
+        return connected.await(time, unit);
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setUseJmx(true);
+
+        TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0?transport.trace=true&trace=true");
+        connectionURI = connector.getPublishableConnectURI();
+        LOG.debug("Using openwire port: {}", connectionURI);
+        return brokerService;
+    }
+
+    @Override
+    public void onCommand(Object command) {
+        try {
+            if (command instanceof WireFormatInfo) {
+                handleWireFormatInfo((WireFormatInfo) command);
+            } else if (command instanceof KeepAliveInfo) {
+                handleKeepAliveInfo((KeepAliveInfo) command);
+            } else if (command instanceof BrokerInfo) {
+                handleBrokerInfo((BrokerInfo) command);
+            } else if (command instanceof Response) {
+                Response response = (Response) command;
+                this.latest = response;
+                LOG.info("Received response for request: {}, response = {}", response.getCorrelationId(), latest);
+                CountDownLatch done = requestMap.get(response.getCorrelationId());
+                if (done != null) {
+                    done.countDown();
+                }
+            } else if (command instanceof MessageDispatch) {
+                LOG.info("Received new MessageDispatch: {}", command);
+                MessageDispatch dispatch = (MessageDispatch) command;
+                messages.add(dispatch.getMessage());
+            } else {
+                LOG.info("Received unknown command: {}", command);
+            }
+        } catch (Exception e) {
+            failureCause = e;
+        }
+    }
+
+    @Override
+    public void onException(IOException error) {
+        failureCause = error;
+    }
+
+    @Override
+    public void transportInterupted() {
+    }
+
+    @Override
+    public void transportResumed() {
+    }
+
+    public WireFormatInfo getRemoteWireFormatInfo() {
+        return this.remoteWireformatInfo;
+    }
+
+    public BrokerInfo getRemoteBrokerInfo() {
+        return this.remoteInfo;
+    }
+
+    public Command getLastCommandReceived() {
+        return this.latest;
+    }
+
+    public boolean isFailed() {
+        return this.failureCause != null;
+    }
+
+    protected void handleWireFormatInfo(WireFormatInfo info) throws Exception {
+        LOG.info("Received remote WireFormatInfo: {}", info);
+        this.remoteWireformatInfo = info;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this + " before negotiation: " + wireFormat);
+        }
+        if (!info.isValid()) {
+            onException(new IOException("Remote wire format magic is invalid"));
+        } else if (info.getVersion() < getOpenWireVersion()) {
+            onException(new IOException("Remote wire format (" + info.getVersion() +
+                        ") is lower the minimum version required (" + getOpenWireVersion() + ")"));
+        }
+
+        wireFormat.renegotiateWireFormat(info);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this + " after negotiation: " + wireFormat);
+        }
+
+        connected.countDown();
+    }
+
+    protected void handleKeepAliveInfo(KeepAliveInfo info) throws Exception {
+        LOG.info("Received remote KeepAliveInfo: {}", info);
+        if (info.isResponseRequired()) {
+            KeepAliveInfo response = new KeepAliveInfo();
+            transport.oneway(response);
+        }
+    }
+
+    protected void handleBrokerInfo(BrokerInfo info) throws Exception {
+        LOG.info("Received remote BrokerInfo: {}", info);
+        this.remoteInfo = info;
+    }
+
+    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    protected QueueViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/OpenWireInteropTests.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/OpenWireInteropTests.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/OpenWireInteropTests.java
new file mode 100644
index 0000000..f4e2951
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/OpenWireInteropTests.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.openwire.commands.ConnectionInfo;
+import org.apache.activemq.openwire.commands.ConsumerInfo;
+import org.apache.activemq.openwire.commands.Message;
+import org.apache.activemq.openwire.commands.OpenWireQueue;
+import org.apache.activemq.openwire.commands.OpenWireTextMessage;
+import org.apache.activemq.openwire.commands.OpenWireTopic;
+import org.apache.activemq.openwire.commands.ProducerInfo;
+import org.apache.activemq.openwire.util.Wait;
+import org.apache.activemq.openwire.utils.OpenWireConnection;
+import org.apache.activemq.openwire.utils.OpenWireConsumer;
+import org.apache.activemq.openwire.utils.OpenWireProducer;
+import org.apache.activemq.openwire.utils.OpenWireSession;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public abstract class OpenWireInteropTests extends OpenWireInteropTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OpenWireInteropTests.class);
+
+    protected OpenWireConnection connectionId;
+    protected boolean tightEncodingEnabled;
+
+    public OpenWireInteropTests(boolean tightEncodingEnabled) {
+        this.tightEncodingEnabled = tightEncodingEnabled;
+    }
+
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { { Boolean.FALSE }, { Boolean.TRUE } });
+    }
+
+    @Override
+    protected boolean isTightEncodingEnabled() {
+        return tightEncodingEnabled;
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        connectionId = new OpenWireConnection();
+    }
+
+    @Test(timeout = 60000)
+    public void testCanConnect() throws Exception {
+        connect();
+        assertTrue(awaitConnected(10, TimeUnit.SECONDS));
+        assertEquals(getOpenWireVersion(), getRemoteWireFormatInfo().getVersion());
+
+        if (isTightEncodingEnabled()) {
+            LOG.info("Should be using tight encoding: are we? {}", wireFormat.isTightEncodingEnabled());
+            assertTrue(wireFormat.isTightEncodingEnabled());
+        } else {
+            LOG.info("Should not be using tight encoding: are we? {}", wireFormat.isTightEncodingEnabled());
+            assertFalse(wireFormat.isTightEncodingEnabled());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateConnection() throws Exception {
+        connect();
+        assertTrue(awaitConnected(10, TimeUnit.SECONDS));
+        assertTrue(request(createConnectionInfo(), 10, TimeUnit.SECONDS));
+        assertEquals(1, brokerService.getAdminView().getCurrentConnectionsCount());
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateSession() throws Exception {
+        connect();
+        assertTrue(awaitConnected(10, TimeUnit.SECONDS));
+        assertTrue(request(createConnectionInfo(), 10, TimeUnit.SECONDS));
+        assertEquals(1, brokerService.getAdminView().getCurrentConnectionsCount());
+        OpenWireSession sessionId = connectionId.createOpenWireSession();
+        assertTrue(request(sessionId.createSessionInfo(), 10, TimeUnit.SECONDS));
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateProducer() throws Exception {
+        connect();
+        assertTrue(awaitConnected(10, TimeUnit.SECONDS));
+        assertTrue(request(createConnectionInfo(), 10, TimeUnit.SECONDS));
+        assertEquals(1, brokerService.getAdminView().getCurrentConnectionsCount());
+
+        OpenWireSession sessionId = connectionId.createOpenWireSession();
+        assertTrue(request(sessionId.createSessionInfo(), 10, TimeUnit.SECONDS));
+        OpenWireProducer producerId = sessionId.createOpenWireProducer();
+
+        ProducerInfo info = producerId.createProducerInfo(new OpenWireTopic(name.getMethodName() + "-Topic"));
+        info.setDispatchAsync(false);
+        assertTrue(request(info, 10, TimeUnit.SECONDS));
+        assertEquals(1, brokerService.getAdminView().getTopicProducers().length);
+
+        assertTrue(request(producerId.createRemoveInfo(), 10, TimeUnit.SECONDS));
+        assertEquals(0, brokerService.getAdminView().getTopicProducers().length);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateConsumer() throws Exception {
+        connect();
+        assertTrue(awaitConnected(10, TimeUnit.SECONDS));
+        assertTrue(request(createConnectionInfo(), 10, TimeUnit.SECONDS));
+        assertEquals(1, brokerService.getAdminView().getCurrentConnectionsCount());
+
+        OpenWireSession sessionId = connectionId.createOpenWireSession();
+        assertTrue(request(sessionId.createSessionInfo(), 10, TimeUnit.SECONDS));
+        OpenWireConsumer consumerId = sessionId.createOpenWireConsumer();
+
+        ConsumerInfo info = consumerId.createConsumerInfo(new OpenWireTopic(name.getMethodName() + "-Topic"));
+        info.setDispatchAsync(false);
+        assertTrue(request(info, 10, TimeUnit.SECONDS));
+        assertEquals(1, brokerService.getAdminView().getTopicSubscribers().length);
+
+        assertTrue(request(consumerId.createRemoveInfo(), 10, TimeUnit.SECONDS));
+        assertEquals(0, brokerService.getAdminView().getTopicSubscribers().length);
+    }
+
+    @Test(timeout = 60000)
+    public void testSendMessageToQueue() throws Exception {
+        connect();
+        assertTrue(awaitConnected(10, TimeUnit.SECONDS));
+        assertTrue(request(createConnectionInfo(), 10, TimeUnit.SECONDS));
+        assertEquals(1, brokerService.getAdminView().getCurrentConnectionsCount());
+
+        OpenWireSession sessionId = connectionId.createOpenWireSession();
+        assertTrue(request(sessionId.createSessionInfo(), 10, TimeUnit.SECONDS));
+        OpenWireProducer producerId = sessionId.createOpenWireProducer();
+
+        OpenWireQueue queue = new OpenWireQueue(name.getMethodName() + "-Queue");
+
+        ProducerInfo info = producerId.createProducerInfo(queue);
+        info.setDispatchAsync(false);
+        assertTrue(request(info, 10, TimeUnit.SECONDS));
+        assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+        OpenWireTextMessage message = new OpenWireTextMessage();
+        message.setText("test");
+        message.setTimestamp(System.currentTimeMillis());
+        message.setMessageId(producerId.getNextMessageId());
+        message.setProducerId(producerId.getProducerId());
+        message.setDestination(queue);
+        message.onSend();
+
+        assertTrue(request(message, 10, TimeUnit.SECONDS));
+        assertEquals(1, getProxyToQueue(queue.getPhysicalName()).getQueueSize());
+
+        assertTrue(request(producerId.createRemoveInfo(), 10, TimeUnit.SECONDS));
+        assertEquals(0, brokerService.getAdminView().getQueueProducers().length);
+    }
+
+    @Test(timeout = 60000)
+    public void testConsumeMessageFromQueue() throws Exception {
+        connect();
+        assertTrue(awaitConnected(10, TimeUnit.SECONDS));
+        assertTrue(request(createConnectionInfo(), 10, TimeUnit.SECONDS));
+        assertEquals(1, brokerService.getAdminView().getCurrentConnectionsCount());
+
+        OpenWireSession sessionId = connectionId.createOpenWireSession();
+        assertTrue(request(sessionId.createSessionInfo(), 10, TimeUnit.SECONDS));
+        OpenWireProducer producerId = sessionId.createOpenWireProducer();
+
+        OpenWireQueue queue = new OpenWireQueue(name.getMethodName() + "-Queue");
+
+        ProducerInfo producerInfo = producerId.createProducerInfo(queue);
+        producerInfo.setDispatchAsync(false);
+        assertTrue(request(producerInfo, 10, TimeUnit.SECONDS));
+        assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+        OpenWireTextMessage message = new OpenWireTextMessage();
+        message.setText("test");
+        message.setTimestamp(System.currentTimeMillis());
+        message.setMessageId(producerId.getNextMessageId());
+        message.setProducerId(producerId.getProducerId());
+        message.setDestination(queue);
+        message.onSend();
+
+        assertTrue(request(message, 10, TimeUnit.SECONDS));
+        assertEquals(1, getProxyToQueue(queue.getPhysicalName()).getQueueSize());
+
+        OpenWireConsumer consumerId = sessionId.createOpenWireConsumer();
+        ConsumerInfo consumerInfo = consumerId.createConsumerInfo(queue);
+        consumerInfo.setDispatchAsync(false);
+        consumerInfo.setPrefetchSize(1);
+        assertTrue(request(consumerInfo, 10, TimeUnit.SECONDS));
+        assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+
+        assertTrue("Should have received a message", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return messages.size() == 1;
+            }
+        }));
+
+        Message incoming = messages.poll();
+        assertTrue(incoming instanceof OpenWireTextMessage);
+        OpenWireTextMessage received = (OpenWireTextMessage) incoming;
+        assertEquals("test", received.getText());
+
+        assertTrue(request(consumerId.createRemoveInfo(), 10, TimeUnit.SECONDS));
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+    }
+
+    protected ConnectionInfo createConnectionInfo() {
+        ConnectionInfo info = new ConnectionInfo(connectionId.getConnectionId());
+        info.setManageable(false);
+        info.setFaultTolerant(false);
+        info.setClientId(name.getMethodName());
+        return info;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/WireFormatInfoMarshaledSizeTest.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/WireFormatInfoMarshaledSizeTest.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/WireFormatInfoMarshaledSizeTest.java
new file mode 100644
index 0000000..fed1abd
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/WireFormatInfoMarshaledSizeTest.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.apache.activemq.openwire.codec.OpenWireFormat;
+import org.apache.activemq.openwire.codec.OpenWireFormatFactory;
+import org.apache.activemq.openwire.commands.WireFormatInfo;
+import org.apache.activemq.util.ByteSequence;
+import org.fusesource.hawtbuf.Buffer;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that the WireFormatInfo marshals and un-marshals correctly.
+ */
+public abstract class WireFormatInfoMarshaledSizeTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WireFormatInfoMarshaledSizeTest.class);
+
+    private OpenWireFormat wireFormat;
+
+    public int getExpectedMarshaledSize() {
+        return 244;
+    }
+
+    public abstract int getVersion();
+
+    @Before
+    public void setUp() throws Exception {
+        OpenWireFormatFactory factory = new OpenWireFormatFactory();
+        factory.setVersion(getVersion());
+
+        wireFormat = factory.createWireFormat();
+    }
+
+    @Test
+    public void testMarshalledSize1() throws Exception {
+        WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
+
+        Buffer result = wireFormat.marshal(info);
+        assertNotNull(result);
+        LOG.info("Size of marshalled object: {}", result.getLength());
+        assertEquals(getExpectedMarshaledSize(), result.getLength());
+
+        ByteArrayInputStream bytesIn = new ByteArrayInputStream(result.toByteArray());
+        DataInputStream input = new DataInputStream(bytesIn);
+
+        int size = input.readInt();
+        assertEquals(getExpectedMarshaledSize() - 4, size);
+    }
+
+    @Test
+    public void testMarshalledSize2() throws Exception {
+        WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
+
+        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+        wireFormat.marshal(info, dataOut);
+        dataOut.close();
+        ByteSequence result = new ByteSequence(bytesOut.toByteArray());
+
+        LOG.info("Size of marshalled object: {}", result.getLength());
+        assertEquals(getExpectedMarshaledSize(), result.getLength());
+
+        ByteArrayInputStream bytesIn = new ByteArrayInputStream(result.data);
+        DataInputStream input = new DataInputStream(bytesIn);
+
+        int size = input.readInt();
+        assertEquals(getExpectedMarshaledSize() - 4, size);
+    }
+
+    @Test
+    public void testMarshalThenUnmarshal1() throws Exception {
+        final WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
+        LOG.info("Original: {}", info);
+
+        Buffer marshaledForm = wireFormat.marshal(info);
+        assertNotNull(marshaledForm);
+
+        Object result = wireFormat.unmarshal(marshaledForm);
+        assertNotNull(result);
+        assertTrue(result instanceof WireFormatInfo);
+        WireFormatInfo duplicate = (WireFormatInfo) result;
+        LOG.info("Duplicated: {}", duplicate);
+
+        assertEquals(info.getVersion(), duplicate.getVersion());
+        assertEquals(info.getCacheSize(), duplicate.getCacheSize());
+        assertEquals(info.getMaxFrameSize(), duplicate.getMaxFrameSize());
+        assertEquals(info.getMaxInactivityDuration(), duplicate.getMaxInactivityDuration());
+        assertEquals(info.getMaxInactivityDurationInitalDelay(), duplicate.getMaxInactivityDurationInitalDelay());
+        assertEquals(info.isCacheEnabled(), duplicate.isCacheEnabled());
+        assertEquals(info.isSizePrefixDisabled(), duplicate.isSizePrefixDisabled());
+        assertEquals(info.isTcpNoDelayEnabled(), duplicate.isTcpNoDelayEnabled());
+        assertEquals(info.isStackTraceEnabled(), duplicate.isTcpNoDelayEnabled());
+        assertEquals(info.isTightEncodingEnabled(), duplicate.isTightEncodingEnabled());
+    }
+
+    @Test
+    public void testMarshalThenUnmarshal2() throws Exception {
+        final WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
+        LOG.info("Original: {}", info);
+
+        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+        wireFormat.marshal(info, dataOut);
+        dataOut.close();
+
+        ByteArrayInputStream bytesIn = new ByteArrayInputStream(bytesOut.toByteArray());
+        DataInputStream dataIn = new DataInputStream(bytesIn);
+
+        Object result = wireFormat.unmarshal(dataIn);
+        assertNotNull(result);
+        assertTrue(result instanceof WireFormatInfo);
+        WireFormatInfo duplicate = (WireFormatInfo) result;
+        LOG.info("Duplicated: {}", duplicate);
+
+        assertEquals(info.getVersion(), duplicate.getVersion());
+        assertEquals(info.getCacheSize(), duplicate.getCacheSize());
+        assertEquals(info.getMaxFrameSize(), duplicate.getMaxFrameSize());
+        assertEquals(info.getMaxInactivityDuration(), duplicate.getMaxInactivityDuration());
+        assertEquals(info.getMaxInactivityDurationInitalDelay(), duplicate.getMaxInactivityDurationInitalDelay());
+        assertEquals(info.isCacheEnabled(), duplicate.isCacheEnabled());
+        assertEquals(info.isSizePrefixDisabled(), duplicate.isSizePrefixDisabled());
+        assertEquals(info.isTcpNoDelayEnabled(), duplicate.isTcpNoDelayEnabled());
+        assertEquals(info.isStackTraceEnabled(), duplicate.isTcpNoDelayEnabled());
+        assertEquals(info.isTightEncodingEnabled(), duplicate.isTightEncodingEnabled());
+    }
+
+    @Test
+    public void testMarshalThenUnmarshal3() throws Exception {
+        final WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
+        LOG.info("Original: {}", info);
+
+        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+        wireFormat.marshal(info, dataOut);
+        dataOut.close();
+
+        Buffer marshaledForm = new Buffer(bytesOut.toByteArray());
+
+        Object result = wireFormat.unmarshal(marshaledForm);
+        assertNotNull(result);
+        assertTrue(result instanceof WireFormatInfo);
+        WireFormatInfo duplicate = (WireFormatInfo) result;
+        LOG.info("Duplicated: {}", duplicate);
+
+        assertEquals(info.getVersion(), duplicate.getVersion());
+        assertEquals(info.getCacheSize(), duplicate.getCacheSize());
+        assertEquals(info.getMaxFrameSize(), duplicate.getMaxFrameSize());
+        assertEquals(info.getMaxInactivityDuration(), duplicate.getMaxInactivityDuration());
+        assertEquals(info.getMaxInactivityDurationInitalDelay(), duplicate.getMaxInactivityDurationInitalDelay());
+        assertEquals(info.isCacheEnabled(), duplicate.isCacheEnabled());
+        assertEquals(info.isSizePrefixDisabled(), duplicate.isSizePrefixDisabled());
+        assertEquals(info.isTcpNoDelayEnabled(), duplicate.isTcpNoDelayEnabled());
+        assertEquals(info.isStackTraceEnabled(), duplicate.isTcpNoDelayEnabled());
+        assertEquals(info.isTightEncodingEnabled(), duplicate.isTightEncodingEnabled());
+    }
+
+    @Test
+    public void testMarshalThenUnmarshal4() throws Exception {
+        final WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
+        LOG.info("Original: {}", info);
+
+        Buffer marshaledForm = wireFormat.marshal(info);
+        assertNotNull(marshaledForm);
+
+        ByteArrayInputStream bytesIn = new ByteArrayInputStream(marshaledForm.data);
+        DataInputStream dataIn = new DataInputStream(bytesIn);
+
+        Object result = wireFormat.unmarshal(dataIn);
+        assertNotNull(result);
+        assertTrue(result instanceof WireFormatInfo);
+        WireFormatInfo duplicate = (WireFormatInfo) result;
+        LOG.info("Duplicated: {}", duplicate);
+
+        assertEquals(info.getVersion(), duplicate.getVersion());
+        assertEquals(info.getCacheSize(), duplicate.getCacheSize());
+        assertEquals(info.getMaxFrameSize(), duplicate.getMaxFrameSize());
+        assertEquals(info.getMaxInactivityDuration(), duplicate.getMaxInactivityDuration());
+        assertEquals(info.getMaxInactivityDurationInitalDelay(), duplicate.getMaxInactivityDurationInitalDelay());
+        assertEquals(info.isCacheEnabled(), duplicate.isCacheEnabled());
+        assertEquals(info.isSizePrefixDisabled(), duplicate.isSizePrefixDisabled());
+        assertEquals(info.isTcpNoDelayEnabled(), duplicate.isTcpNoDelayEnabled());
+        assertEquals(info.isStackTraceEnabled(), duplicate.isTcpNoDelayEnabled());
+        assertEquals(info.isTightEncodingEnabled(), duplicate.isTightEncodingEnabled());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v1/OpenWireV1Test.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v1/OpenWireV1Test.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v1/OpenWireV1Test.java
new file mode 100644
index 0000000..c185561
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v1/OpenWireV1Test.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v1;
+
+import org.apache.activemq.openwire.codec.OpenWireInteropTests;
+
+public class OpenWireV1Test extends OpenWireInteropTests {
+
+    /**
+     * @param tightEncodingEnabled
+     */
+    public OpenWireV1Test(boolean tightEncodingEnabled) {
+        super(tightEncodingEnabled);
+    }
+
+    @Override
+    protected int getOpenWireVersion() {
+        return 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v1/WireFormatInfoV1MarshaledSizeTest.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v1/WireFormatInfoV1MarshaledSizeTest.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v1/WireFormatInfoV1MarshaledSizeTest.java
new file mode 100644
index 0000000..40af674
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v1/WireFormatInfoV1MarshaledSizeTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v1;
+
+import org.apache.activemq.openwire.codec.WireFormatInfoMarshaledSizeTest;
+
+/**
+ * Test marshaling with WireFormatInfo for this Version.
+ */
+public class WireFormatInfoV1MarshaledSizeTest extends WireFormatInfoMarshaledSizeTest {
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v10/OpenWireV10Test.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v10/OpenWireV10Test.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v10/OpenWireV10Test.java
new file mode 100644
index 0000000..5f2eb19
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v10/OpenWireV10Test.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v10;
+
+import org.apache.activemq.openwire.codec.OpenWireInteropTests;
+
+public class OpenWireV10Test extends OpenWireInteropTests {
+
+    /**
+     * @param tightEncodingEnabled
+     */
+    public OpenWireV10Test(boolean tightEncodingEnabled) {
+        super(tightEncodingEnabled);
+    }
+
+    @Override
+    protected int getOpenWireVersion() {
+        return 10;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v10/WireFormatInfoV10MarshaledSizeTest.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v10/WireFormatInfoV10MarshaledSizeTest.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v10/WireFormatInfoV10MarshaledSizeTest.java
new file mode 100644
index 0000000..54d65bd
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v10/WireFormatInfoV10MarshaledSizeTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v10;
+
+import org.apache.activemq.openwire.codec.WireFormatInfoMarshaledSizeTest;
+
+/**
+ * Test marshaling with WireFormatInfo for this Version.
+ */
+public class WireFormatInfoV10MarshaledSizeTest extends WireFormatInfoMarshaledSizeTest {
+
+    @Override
+    public int getVersion() {
+        return 10;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v2/OpenWireV2Test.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v2/OpenWireV2Test.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v2/OpenWireV2Test.java
new file mode 100644
index 0000000..14e1fae
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v2/OpenWireV2Test.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v2;
+
+import org.apache.activemq.openwire.codec.OpenWireInteropTests;
+
+public class OpenWireV2Test extends OpenWireInteropTests {
+
+    /**
+     * @param tightEncodingEnabled
+     */
+    public OpenWireV2Test(boolean tightEncodingEnabled) {
+        super(tightEncodingEnabled);
+    }
+
+    @Override
+    protected int getOpenWireVersion() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v2/WireFormatInfoV2MarshaledSizeTest.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v2/WireFormatInfoV2MarshaledSizeTest.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v2/WireFormatInfoV2MarshaledSizeTest.java
new file mode 100644
index 0000000..1c19533
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v2/WireFormatInfoV2MarshaledSizeTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v2;
+
+import org.apache.activemq.openwire.codec.WireFormatInfoMarshaledSizeTest;
+
+/**
+ * Test marshaling with WireFormatInfo for this Version.
+ */
+public class WireFormatInfoV2MarshaledSizeTest extends WireFormatInfoMarshaledSizeTest {
+
+    @Override
+    public int getVersion() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v3/OpenWireV3Test.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v3/OpenWireV3Test.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v3/OpenWireV3Test.java
new file mode 100644
index 0000000..de32688
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v3/OpenWireV3Test.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v3;
+
+import org.apache.activemq.openwire.codec.OpenWireInteropTests;
+
+public class OpenWireV3Test extends OpenWireInteropTests {
+
+    /**
+     * @param tightEncodingEnabled
+     */
+    public OpenWireV3Test(boolean tightEncodingEnabled) {
+        super(tightEncodingEnabled);
+    }
+
+    @Override
+    protected int getOpenWireVersion() {
+        return 3;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v3/WireFormatInfoV3MarshaledSizeTest.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v3/WireFormatInfoV3MarshaledSizeTest.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v3/WireFormatInfoV3MarshaledSizeTest.java
new file mode 100644
index 0000000..68978fe
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v3/WireFormatInfoV3MarshaledSizeTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v3;
+
+import org.apache.activemq.openwire.codec.WireFormatInfoMarshaledSizeTest;
+
+/**
+ * Test marshaling with WireFormatInfo for this Version.
+ */
+public class WireFormatInfoV3MarshaledSizeTest extends WireFormatInfoMarshaledSizeTest {
+
+    @Override
+    public int getVersion() {
+        return 3;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v4/OpenWireV4Test.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v4/OpenWireV4Test.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v4/OpenWireV4Test.java
new file mode 100644
index 0000000..6cde172
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v4/OpenWireV4Test.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v4;
+
+import org.apache.activemq.openwire.codec.OpenWireInteropTests;
+
+public class OpenWireV4Test extends OpenWireInteropTests {
+
+    /**
+     * @param tightEncodingEnabled
+     */
+    public OpenWireV4Test(boolean tightEncodingEnabled) {
+        super(tightEncodingEnabled);
+    }
+
+    @Override
+    protected int getOpenWireVersion() {
+        return 4;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v4/WireFormatInfoV4MarshaledSizeTest.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v4/WireFormatInfoV4MarshaledSizeTest.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v4/WireFormatInfoV4MarshaledSizeTest.java
new file mode 100644
index 0000000..e448935
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v4/WireFormatInfoV4MarshaledSizeTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v4;
+
+import org.apache.activemq.openwire.codec.WireFormatInfoMarshaledSizeTest;
+
+/**
+ * Test marshaling with WireFormatInfo for this Version.
+ */
+public class WireFormatInfoV4MarshaledSizeTest extends WireFormatInfoMarshaledSizeTest {
+
+    @Override
+    public int getVersion() {
+        return 4;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v5/OpenWireV5Test.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v5/OpenWireV5Test.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v5/OpenWireV5Test.java
new file mode 100644
index 0000000..3331c5b
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v5/OpenWireV5Test.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v5;
+
+import org.apache.activemq.openwire.codec.OpenWireInteropTests;
+
+public class OpenWireV5Test extends OpenWireInteropTests {
+
+    /**
+     * @param tightEncodingEnabled
+     */
+    public OpenWireV5Test(boolean tightEncodingEnabled) {
+        super(tightEncodingEnabled);
+    }
+
+    @Override
+    protected int getOpenWireVersion() {
+        return 5;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v5/WireFormatInfoV5MarshaledSizeTest.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v5/WireFormatInfoV5MarshaledSizeTest.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v5/WireFormatInfoV5MarshaledSizeTest.java
new file mode 100644
index 0000000..63303ad
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v5/WireFormatInfoV5MarshaledSizeTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v5;
+
+import org.apache.activemq.openwire.codec.WireFormatInfoMarshaledSizeTest;
+
+/**
+ * Test marshaling with WireFormatInfo for this Version.
+ */
+public class WireFormatInfoV5MarshaledSizeTest extends WireFormatInfoMarshaledSizeTest {
+
+    @Override
+    public int getVersion() {
+        return 5;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v6/OpenWireV6Test.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v6/OpenWireV6Test.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v6/OpenWireV6Test.java
new file mode 100644
index 0000000..d33a5b8
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v6/OpenWireV6Test.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v6;
+
+import org.apache.activemq.openwire.codec.OpenWireInteropTests;
+
+public class OpenWireV6Test extends OpenWireInteropTests {
+
+    /**
+     * @param tightEncodingEnabled
+     */
+    public OpenWireV6Test(boolean tightEncodingEnabled) {
+        super(tightEncodingEnabled);
+    }
+
+    @Override
+    protected int getOpenWireVersion() {
+        return 6;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v6/WireFormatInfoV6MarshaledSizeTest.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v6/WireFormatInfoV6MarshaledSizeTest.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v6/WireFormatInfoV6MarshaledSizeTest.java
new file mode 100644
index 0000000..fca4250
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v6/WireFormatInfoV6MarshaledSizeTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v6;
+
+import org.apache.activemq.openwire.codec.WireFormatInfoMarshaledSizeTest;
+
+/**
+ * Test marshaling with WireFormatInfo for this Version.
+ */
+public class WireFormatInfoV6MarshaledSizeTest extends WireFormatInfoMarshaledSizeTest {
+
+    @Override
+    public int getVersion() {
+        return 6;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v7/OpenWireV7Test.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v7/OpenWireV7Test.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v7/OpenWireV7Test.java
new file mode 100644
index 0000000..3cdabda
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v7/OpenWireV7Test.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v7;
+
+import org.apache.activemq.openwire.codec.OpenWireInteropTests;
+
+public class OpenWireV7Test extends OpenWireInteropTests {
+
+    /**
+     * @param tightEncodingEnabled
+     */
+    public OpenWireV7Test(boolean tightEncodingEnabled) {
+        super(tightEncodingEnabled);
+    }
+
+    @Override
+    protected int getOpenWireVersion() {
+        return 7;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v7/WireFormatInfoV7MarshaledSizeTest.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v7/WireFormatInfoV7MarshaledSizeTest.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v7/WireFormatInfoV7MarshaledSizeTest.java
new file mode 100644
index 0000000..b28bf48
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v7/WireFormatInfoV7MarshaledSizeTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v7;
+
+import org.apache.activemq.openwire.codec.WireFormatInfoMarshaledSizeTest;
+
+/**
+ * Test marshaling with WireFormatInfo for this Version.
+ */
+public class WireFormatInfoV7MarshaledSizeTest extends WireFormatInfoMarshaledSizeTest {
+
+    @Override
+    public int getVersion() {
+        return 7;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v8/OpenWireV8Test.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v8/OpenWireV8Test.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v8/OpenWireV8Test.java
new file mode 100644
index 0000000..acb8930
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v8/OpenWireV8Test.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v8;
+
+import org.apache.activemq.openwire.codec.OpenWireInteropTests;
+
+public class OpenWireV8Test extends OpenWireInteropTests {
+
+    /**
+     * @param tightEncodingEnabled
+     */
+    public OpenWireV8Test(boolean tightEncodingEnabled) {
+        super(tightEncodingEnabled);
+    }
+
+    @Override
+    protected int getOpenWireVersion() {
+        return 8;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v8/WireFormatInfoV8MarshaledSizeTest.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v8/WireFormatInfoV8MarshaledSizeTest.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v8/WireFormatInfoV8MarshaledSizeTest.java
new file mode 100644
index 0000000..2e1b816
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v8/WireFormatInfoV8MarshaledSizeTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v8;
+
+import org.apache.activemq.openwire.codec.WireFormatInfoMarshaledSizeTest;
+
+/**
+ * Test marshaling with WireFormatInfo for this Version.
+ */
+public class WireFormatInfoV8MarshaledSizeTest extends WireFormatInfoMarshaledSizeTest {
+
+    @Override
+    public int getVersion() {
+        return 8;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v9/OpenWireV9Test.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v9/OpenWireV9Test.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v9/OpenWireV9Test.java
new file mode 100644
index 0000000..b91c699
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v9/OpenWireV9Test.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v9;
+
+import org.apache.activemq.openwire.codec.OpenWireInteropTests;
+
+public class OpenWireV9Test extends OpenWireInteropTests {
+
+    /**
+     * @param tightEncodingEnabled
+     */
+    public OpenWireV9Test(boolean tightEncodingEnabled) {
+        super(tightEncodingEnabled);
+    }
+
+    @Override
+    protected int getOpenWireVersion() {
+        return 9;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v9/WireFormatInfoV9MarshaledSizeTest.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v9/WireFormatInfoV9MarshaledSizeTest.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v9/WireFormatInfoV9MarshaledSizeTest.java
new file mode 100644
index 0000000..6bea3a8
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/v9/WireFormatInfoV9MarshaledSizeTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec.v9;
+
+import org.apache.activemq.openwire.codec.WireFormatInfoMarshaledSizeTest;
+
+/**
+ * Test marshaling with WireFormatInfo for this Version.
+ */
+public class WireFormatInfoV9MarshaledSizeTest extends WireFormatInfoMarshaledSizeTest {
+
+    @Override
+    public int getVersion() {
+        return 9;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/util/TcpTransport.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/util/TcpTransport.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/util/TcpTransport.java
new file mode 100644
index 0000000..48ba3d1
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/util/TcpTransport.java
@@ -0,0 +1,414 @@
+/**
+gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.openwire.codec.OpenWireFormat;
+import org.apache.activemq.transport.tcp.TcpBufferedInputStream;
+import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
+import org.apache.activemq.util.ServiceStopper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class TcpTransport implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
+
+    protected final URI remoteLocation;
+    protected final OpenWireFormat wireFormat;
+
+    protected int connectionTimeout = 30000;
+    protected int socketBufferSize = 64 * 1024;
+    protected int ioBufferSize = 8 * 1024;
+    protected Socket socket;
+    protected DataOutputStream dataOut;
+    protected DataInputStream dataIn;
+
+    protected int minmumWireFormatVersion;
+    protected SocketFactory socketFactory;
+    protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
+    protected volatile int receiveCounter;
+
+    private Thread runnerThread;
+    private AtomicBoolean started = new AtomicBoolean(false);
+    private AtomicBoolean stopping = new AtomicBoolean(false);
+    private AtomicBoolean stopped = new AtomicBoolean(false);
+    private TransportListener transportListener;
+
+    /**
+     * Connect to a remote Node - e.g. a Broker
+     *
+     * @param wireFormat
+     * @param socketFactory
+     * @param remoteLocation
+     * @param localLocation
+     *        - e.g. local InetAddress and local port
+     * @throws IOException
+     * @throws UnknownHostException
+     */
+    public TcpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
+        this.wireFormat = wireFormat;
+        this.socketFactory = SocketFactory.getDefault();
+        try {
+            this.socket = socketFactory.createSocket();
+        } catch (SocketException e) {
+            this.socket = null;
+        }
+        this.remoteLocation = remoteLocation;
+    }
+
+    /**
+     * A one way asynchronous send
+     */
+    public void oneway(Object command) throws IOException {
+        checkStarted();
+        wireFormat.marshal(command, dataOut);
+        dataOut.flush();
+    }
+
+    /**
+     * reads packets from a Socket
+     */
+    public void run() {
+        LOG.trace("TCP consumer thread for {} starting", this);
+        this.runnerThread = Thread.currentThread();
+        try {
+            while (!isStopped()) {
+                doRun();
+            }
+        } catch (IOException e) {
+            stoppedLatch.get().countDown();
+            onException(e);
+        } catch (Throwable e) {
+            stoppedLatch.get().countDown();
+            IOException ioe = new IOException("Unexpected error occured: " + e);
+            ioe.initCause(e);
+            onException(ioe);
+        } finally {
+            stoppedLatch.get().countDown();
+        }
+    }
+
+    protected void doRun() throws IOException {
+        try {
+            Object command = readCommand();
+            doConsume(command);
+        } catch (SocketTimeoutException e) {
+        } catch (InterruptedIOException e) {
+        }
+    }
+
+    protected Object readCommand() throws IOException {
+        return wireFormat.unmarshal(dataIn);
+    }
+
+    /**
+     * Configures the socket for use
+     *
+     * @param sock
+     *        the socket
+     * @throws SocketException
+     *         , IllegalArgumentException if setting the options on the socket
+     *         failed.
+     */
+    protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException {
+        try {
+            sock.setReceiveBufferSize(socketBufferSize);
+            sock.setSendBufferSize(socketBufferSize);
+        } catch (SocketException se) {
+            LOG.warn("Cannot set socket buffer size = {}", socketBufferSize);
+            LOG.debug("Cannot set socket buffer size. Reason: {}. This exception is ignored.",
+                      se.getMessage(), se);
+        }
+    }
+
+    public void start() throws Exception {
+        if (started.compareAndSet(false, true)) {
+            boolean success = false;
+            stopped.set(false);
+            try {
+                doStart();
+                success = true;
+            } finally {
+                started.set(success);
+            }
+        }
+    }
+
+    public void stop() throws Exception {
+        if (stopped.compareAndSet(false, true)) {
+            stopping.set(true);
+            ServiceStopper stopper = new ServiceStopper();
+            try {
+                doStop(stopper);
+            } catch (Exception e) {
+                stopper.onException(this, e);
+            }
+            stopped.set(true);
+            started.set(false);
+            stopping.set(false);
+            stopper.throwFirstException();
+        }
+
+        CountDownLatch countDownLatch = stoppedLatch.get();
+        if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
+            countDownLatch.await(1, TimeUnit.SECONDS);
+        }
+    }
+
+    protected void doStart() throws Exception {
+        connect();
+        stoppedLatch.set(new CountDownLatch(1));
+
+        runnerThread = new Thread(null, this, "OpenWire Test Transport: " + toString());
+        runnerThread.setDaemon(false);
+        runnerThread.start();
+    }
+
+    protected void connect() throws Exception {
+        if (socket == null && socketFactory == null) {
+            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
+        }
+
+        InetSocketAddress remoteAddress = null;
+
+        if (remoteLocation != null) {
+            remoteAddress = new InetSocketAddress(remoteLocation.getHost(), remoteLocation.getPort());
+        }
+
+        socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
+
+        initialiseSocket(socket);
+        initializeStreams();
+    }
+
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        LOG.debug("Stopping transport {}", this);
+
+        if (socket != null) {
+            LOG.trace("Closing socket {}", socket);
+            try {
+                socket.close();
+                LOG.debug("Closed socket {}", socket);
+            } catch (IOException e) {
+                LOG.debug("Caught exception closing socket {}. This exception will be ignored.", socket, e);
+            }
+        }
+    }
+
+    protected void initializeStreams() throws Exception {
+        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
+            @Override
+            public int read() throws IOException {
+                receiveCounter++;
+                return super.read();
+            }
+
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                receiveCounter++;
+                return super.read(b, off, len);
+            }
+
+            @Override
+            public long skip(long n) throws IOException {
+                receiveCounter++;
+                return super.skip(n);
+            }
+
+            @Override
+            protected void fill() throws IOException {
+                receiveCounter++;
+                super.fill();
+            }
+        };
+        this.dataIn = new DataInputStream(buffIn);
+        TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+        this.dataOut = new DataOutputStream(outputStream);
+    }
+
+    protected void closeStreams() throws IOException {
+        if (dataOut != null) {
+            dataOut.close();
+        }
+        if (dataIn != null) {
+            dataIn.close();
+        }
+    }
+
+    /**
+     * Process the inbound command
+     */
+    public void doConsume(Object command) {
+        if (command != null) {
+            if (transportListener != null) {
+                transportListener.onCommand(command);
+            } else {
+                LOG.error("No transportListener available to process inbound command: {}", command);
+            }
+        }
+    }
+
+    /**
+     * Passes any IO exceptions into the transport listener
+     */
+    public void onException(IOException e) {
+        if (transportListener != null) {
+            try {
+                transportListener.onException(e);
+            } catch (RuntimeException e2) {
+                // Handle any unexpected runtime exceptions by debug logging them.
+                LOG.debug("Unexpected runtime exception: " + e2, e2);
+            }
+        }
+    }
+
+    public String getRemoteAddress() {
+        if (socket != null) {
+            SocketAddress address = socket.getRemoteSocketAddress();
+            if (address instanceof InetSocketAddress) {
+                return "tcp://" + ((InetSocketAddress) address).getAddress().getHostAddress() + ":" + ((InetSocketAddress) address).getPort();
+            } else {
+                return "" + socket.getRemoteSocketAddress();
+            }
+        }
+        return null;
+    }
+
+    public int getReceiveCounter() {
+        return receiveCounter;
+    }
+
+    public OpenWireFormat getWireFormat() {
+        return wireFormat;
+    }
+
+    /**
+     * @return true if this service has been started
+     */
+    public boolean isStarted() {
+        return started.get();
+    }
+
+    /**
+     * @return true if this service is in the process of closing
+     */
+    public boolean isStopping() {
+        return stopping.get();
+    }
+
+    /**
+     * @return true if this service is closed
+     */
+    public boolean isStopped() {
+        return stopped.get();
+    }
+
+    /**
+     * Returns the current transport listener
+     */
+    public TransportListener getTransportListener() {
+        return transportListener;
+    }
+
+    /**
+     * Registers an inbound command listener
+     *
+     * @param commandListener
+     */
+    public void setTransportListener(TransportListener commandListener) {
+        this.transportListener = commandListener;
+    }
+
+    public int getMinmumWireFormatVersion() {
+        return minmumWireFormatVersion;
+    }
+
+    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
+        this.minmumWireFormatVersion = minmumWireFormatVersion;
+    }
+
+    public int getSocketBufferSize() {
+        return socketBufferSize;
+    }
+
+    /**
+     * Sets the buffer size to use on the socket
+     */
+    public void setSocketBufferSize(int socketBufferSize) {
+        this.socketBufferSize = socketBufferSize;
+    }
+
+    public int getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    /**
+     * Sets the timeout used to connect to the socket
+     */
+    public void setConnectionTimeout(int connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    /**
+     * @return the ioBufferSize
+     */
+    public int getIoBufferSize() {
+        return this.ioBufferSize;
+    }
+
+    /**
+     * @param ioBufferSize
+     *        the ioBufferSize to set
+     */
+    public void setIoBufferSize(int ioBufferSize) {
+        this.ioBufferSize = ioBufferSize;
+    }
+
+    /**
+     * @return pretty print of 'this'
+     */
+    @Override
+    public String toString() {
+        return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort() + "@" + socket.getLocalPort() : remoteLocation);
+    }
+
+    protected void checkStarted() throws IOException {
+        if (!isStarted()) {
+            throw new IOException("The transport is not running.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/util/TransportListener.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/util/TransportListener.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/util/TransportListener.java
new file mode 100644
index 0000000..81c886b
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/util/TransportListener.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.util;
+
+import java.io.IOException;
+
+/**
+ * An asynchronous listener of commands
+ */
+public interface TransportListener {
+
+    /**
+     * called to process a command
+     *
+     * @param command
+     */
+    void onCommand(Object command);
+
+    /**
+     * An unrecoverable exception has occurred on the transport
+     *
+     * @param error
+     */
+    void onException(IOException error);
+
+    /**
+     * The transport has suffered an interruption from which it hopes to recover
+     *
+     */
+    void transportInterupted();
+
+    /**
+     * The transport has resumed after an interruption
+     *
+     */
+    void transportResumed();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/util/Wait.java
----------------------------------------------------------------------
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/util/Wait.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/util/Wait.java
new file mode 100644
index 0000000..2bc1e61
--- /dev/null
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/util/Wait.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.util;
+
+import java.util.concurrent.TimeUnit;
+
+public class Wait {
+
+    public static final long MAX_WAIT_MILLIS = 30 * 1000;
+    public static final int SLEEP_MILLIS = 1000;
+
+    public interface Condition {
+        boolean isSatisified() throws Exception;
+    }
+
+    public static boolean waitFor(Condition condition) throws Exception {
+        return waitFor(condition, MAX_WAIT_MILLIS);
+    }
+
+    public static boolean waitFor(final Condition condition, final long duration) throws Exception {
+        return waitFor(condition, duration, SLEEP_MILLIS);
+    }
+
+    public static boolean waitFor(final Condition condition, final long duration, final int sleepMillis) throws Exception {
+
+        final long expiry = System.currentTimeMillis() + duration;
+        boolean conditionSatisified = condition.isSatisified();
+        while (!conditionSatisified && System.currentTimeMillis() < expiry) {
+            TimeUnit.MILLISECONDS.sleep(sleepMillis);
+            conditionSatisified = condition.isSatisified();
+        }
+        return conditionSatisified;
+    }
+}


Mime
View raw message