activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5848
Date Wed, 08 Jul 2015 21:45:07 GMT
Repository: activemq
Updated Branches:
  refs/heads/master b0952d874 -> 13044decc


https://issues.apache.org/jira/browse/AMQ-5848

Use the latest openwire version marshallers in the KahaDB store when
starting from a clean install, drop back to the version used in the
existing store if one is found.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/13044dec
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/13044dec
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/13044dec

Branch: refs/heads/master
Commit: 13044deccea1e1bd6849dbe0babb6720aed634ac
Parents: b0952d8
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Jul 8 17:29:32 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Jul 8 17:29:32 2015 -0400

----------------------------------------------------------------------
 .../activemq/transport/amqp/AMQ4563Test.java    |   5 -
 .../transport/amqp/AmqpTestSupport.java         |   6 -
 .../apache/activemq/broker/BrokerService.java   |   2 +-
 .../apache/activemq/command/CommandTypes.java   |   4 +-
 .../activemq/openwire/OpenWireFormat.java       |  44 +++--
 .../activemq/store/kahadb/KahaDBStore.java      |  10 +-
 .../activemq/store/kahadb/MessageDatabase.java  |   4 +-
 .../activemq/store/kahadb/AMQ5626Test.java      |  63 ++-----
 .../JournalCorruptionEofIndexRecoveryTest.java  |  43 ++---
 .../JournalCorruptionIndexRecoveryTest.java     |  42 ++---
 .../kahadb/KahaDBStoreOpenWireVersionTest.java  | 187 +++++++++++++++++++
 .../transport/tcp/InactivityMonitorTest.java    |  29 ++-
 12 files changed, 298 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/13044dec/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
index 099944d..6520caa 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java
@@ -222,9 +222,4 @@ public class AMQ4563Test extends AmqpTestSupport {
     protected boolean isPersistent() {
         return true;
     }
-
-    @Override
-    protected int getStoreOpenWireVersion() {
-        return 10;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/13044dec/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 6153762..91909d4 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -45,7 +45,6 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
 import org.apache.activemq.broker.jmx.TopicViewMBean;
-import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.spring.SpringSslContext;
 import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.apache.activemq.transport.amqp.protocol.AmqpConnection;
@@ -102,7 +101,6 @@ public class AmqpTestSupport {
             KahaDBStore kaha = new KahaDBStore();
             kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
             brokerService.setPersistenceAdapter(kaha);
-            brokerService.setStoreOpenWireVersion(getStoreOpenWireVersion());
         }
         brokerService.setSchedulerSupport(false);
         brokerService.setAdvisorySupport(false);
@@ -188,10 +186,6 @@ public class AmqpTestSupport {
         return true;
     }
 
-    protected int getStoreOpenWireVersion() {
-        return OpenWireFormat.DEFAULT_WIRE_VERSION;
-    }
-
     protected boolean isUseOpenWireConnector() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/13044dec/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 8aa6f95..0290a76 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -251,7 +251,7 @@ public class BrokerService implements Service {
     private boolean restartRequested = false;
     private boolean rejectDurableConsumers = false;
 
-    private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION;
+    private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
 
     static {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/13044dec/activemq-client/src/main/java/org/apache/activemq/command/CommandTypes.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/CommandTypes.java b/activemq-client/src/main/java/org/apache/activemq/command/CommandTypes.java
index 49ed9cb..e4bf463 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/CommandTypes.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/CommandTypes.java
@@ -27,8 +27,10 @@ public interface CommandTypes {
     byte PROTOCOL_VERSION = 11;
 
     // What is the latest version of the openwire protocol used in the stores
-    byte PROTOCOL_STORE_VERSION = 6;
+    byte PROTOCOL_STORE_VERSION = 11;
 
+    // What is the legacy version that old KahaDB store's most commonly used
+    byte PROTOCOL_LEGACY_STORE_VERSION = 6;
 
     // A marshaling layer can use this type to specify a null object.
     byte NULL = 0;

http://git-wip-us.apache.org/repos/asf/activemq/blob/13044dec/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
index e605a4e..f70bff8 100755
--- a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
+++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
@@ -33,13 +33,14 @@ import org.apache.activemq.util.DataByteArrayOutputStream;
 import org.apache.activemq.wireformat.WireFormat;
 
 /**
- * 
- * 
+ *
+ *
  */
 public final class OpenWireFormat implements WireFormat {
 
-    public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
+    public static final int DEFAULT_STORE_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
     public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION;
+    public static final int DEFAULT_LEGACY_VERSION = CommandTypes.PROTOCOL_LEGACY_STORE_VERSION;
     public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
 
     static final byte NULL_TYPE = CommandTypes.NULL;
@@ -64,15 +65,16 @@ public final class OpenWireFormat implements WireFormat {
     private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
     private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
     private WireFormatInfo preferedWireFormatInfo;
-    
+
     public OpenWireFormat() {
-        this(DEFAULT_VERSION);
+        this(DEFAULT_STORE_VERSION);
     }
 
     public OpenWireFormat(int i) {
         setVersion(i);
     }
 
+    @Override
     public int hashCode() {
         return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
                ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
@@ -91,6 +93,7 @@ public final class OpenWireFormat implements WireFormat {
         return answer;
     }
 
+    @Override
     public boolean equals(Object object) {
         if (object == null) {
             return false;
@@ -102,6 +105,7 @@ public final class OpenWireFormat implements WireFormat {
     }
 
 
+    @Override
     public String toString() {
         return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
                + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled +  ", maxFrameSize=" + maxFrameSize + "}";
@@ -109,10 +113,12 @@ public final class OpenWireFormat implements WireFormat {
         // tightEncodingEnabled="+tightEncodingEnabled+"}";
     }
 
+    @Override
     public int getVersion() {
         return version;
     }
 
+    @Override
     public synchronized ByteSequence marshal(Object command) throws IOException {
 
         if (cacheEnabled) {
@@ -125,7 +131,7 @@ public final class OpenWireFormat implements WireFormat {
 
             DataStructure c = (DataStructure)command;
             byte type = c.getDataStructureType();
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + type);
             }
@@ -173,6 +179,7 @@ public final class OpenWireFormat implements WireFormat {
         return sequence;
     }
 
+    @Override
     public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
         bytesIn.restart(sequence);
         // DataInputStream dis = new DataInputStream(new
@@ -197,6 +204,7 @@ public final class OpenWireFormat implements WireFormat {
         return command;
     }
 
+    @Override
     public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
 
         if (cacheEnabled) {
@@ -208,7 +216,7 @@ public final class OpenWireFormat implements WireFormat {
 
             DataStructure c = (DataStructure)o;
             byte type = c.getDataStructureType();
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + type);
             }
@@ -246,12 +254,13 @@ public final class OpenWireFormat implements WireFormat {
 
         } else {
             if (!sizePrefixDisabled) {
-            	dataOut.writeInt(size);
+                dataOut.writeInt(size);
             }
             dataOut.writeByte(NULL_TYPE);
         }
     }
 
+    @Override
     public Object unmarshal(DataInput dis) throws IOException {
         DataInput dataIn = dis;
         if (!sizePrefixDisabled) {
@@ -276,7 +285,7 @@ public final class OpenWireFormat implements WireFormat {
         if (o != null) {
             DataStructure c = (DataStructure)o;
             byte type = c.getDataStructureType();
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + type);
             }
@@ -299,7 +308,7 @@ public final class OpenWireFormat implements WireFormat {
         if (o != null) {
             DataStructure c = (DataStructure)o;
             byte type = c.getDataStructureType();
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + type);
             }
@@ -312,9 +321,10 @@ public final class OpenWireFormat implements WireFormat {
     /**
      * Allows you to dynamically switch the version of the openwire protocol
      * being used.
-     * 
+     *
      * @param version
      */
+    @Override
     public void setVersion(int version) {
         String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
         Class mfClass;
@@ -343,7 +353,7 @@ public final class OpenWireFormat implements WireFormat {
     public Object doUnmarshal(DataInput dis) throws IOException {
         byte dataType = dis.readByte();
         if (dataType != NULL_TYPE) {
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+            DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + dataType);
             }
@@ -382,7 +392,7 @@ public final class OpenWireFormat implements WireFormat {
         }
 
         byte type = o.getDataStructureType();
-        DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+        DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
         if (dsm == null) {
             throw new IOException("Unknown data type: " + type);
         }
@@ -409,7 +419,7 @@ public final class OpenWireFormat implements WireFormat {
 
         } else {
 
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + type);
             }
@@ -422,7 +432,7 @@ public final class OpenWireFormat implements WireFormat {
         if (bs.readBoolean()) {
 
             byte dataType = dis.readByte();
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+            DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + dataType);
             }
@@ -455,7 +465,7 @@ public final class OpenWireFormat implements WireFormat {
         if (dis.readBoolean()) {
 
             byte dataType = dis.readByte();
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
+            DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + dataType);
             }
@@ -473,7 +483,7 @@ public final class OpenWireFormat implements WireFormat {
         if (o != null) {
             byte type = o.getDataStructureType();
             dataOut.writeByte(type);
-            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
+            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
             if (dsm == null) {
                 throw new IOException("Unknown data type: " + type);
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/13044dec/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 89ee40c..bf14d69 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -209,9 +209,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             // In case the recovered store used a different OpenWire version log a warning
             // to assist in determining why journal reads fail.
             if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
-                LOG.warn("Recovered Store uses a different OpenWire version[{}] " +
-                         "than the version configured[{}].",
+                LOG.warn("Existing Store uses a different OpenWire version[{}] " +
+                         "than the version configured[{}] reverting to the version " +
+                         "used by this store, some newer broker features may not work" +
+                         "as expected.",
                          metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
+
+                // Update the broker service instance to the actual version in use.
+                wireFormat.setVersion(metadata.openwireVersion);
+                brokerService.setStoreOpenWireVersion(metadata.openwireVersion);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/13044dec/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 2fa4bb1..7aa36c3 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -132,7 +132,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
         protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
         protected int version = VERSION;
-        protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION;
+        protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
 
         public void read(DataInput is) throws IOException {
             state = is.readInt();
@@ -168,7 +168,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             try {
                 openwireVersion = is.readInt();
             } catch (EOFException expectedOnUpgrade) {
-                openwireVersion = OpenWireFormat.DEFAULT_VERSION;
+                openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
             }
             LOG.info("KahaDB is version " + version);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/13044dec/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/AMQ5626Test.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/AMQ5626Test.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/AMQ5626Test.java
index 9097af7..43cbf13 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/AMQ5626Test.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/AMQ5626Test.java
@@ -17,16 +17,21 @@
 
 package org.apache.activemq.store.kahadb;
 
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+
 import javax.jms.ConnectionFactory;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.management.ObjectName;
+
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
@@ -42,14 +47,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-
 public class AMQ5626Test {
 
     private static final Logger LOG = LoggerFactory.getLogger(AMQ5626Test.class);
-    private static final String QUEUE_NAME = "TesQ";
+
+    private final String QUEUE_NAME = "TesQ";
+    private final String KAHADB_DIRECTORY = "target/activemq-data/";
+
     private BrokerService brokerService;
     private URI brokerUri;
 
@@ -58,6 +62,15 @@ public class AMQ5626Test {
         createBroker(true);
     }
 
+    @After
+    public void teardown() throws Exception {
+        try {
+            brokerService.stop();
+        } catch (Exception ex) {
+            LOG.error("FAILED TO STOP/START BROKER EXCEPTION", ex);
+        }
+    }
+
     private void createBroker(boolean deleteMessagesOnStart) throws Exception {
 
         brokerService = new BrokerService();
@@ -79,7 +92,7 @@ public class AMQ5626Test {
         transportConnector.setName("openwire");
         transportConnector.setUri(new URI("tcp://0.0.0.0:0"));
         brokerService.addConnector(transportConnector);
-
+        brokerService.setDataDirectory(KAHADB_DIRECTORY);
         brokerService.setDeleteAllMessagesOnStartup(deleteMessagesOnStart);
         brokerService.getManagementContext().setCreateConnector(false);
         brokerService.start();
@@ -88,7 +101,7 @@ public class AMQ5626Test {
         brokerUri = transportConnector.getPublishableConnectURI();
     }
 
-    @Test
+    @Test(timeout = 30000)
     public void testPriorityMessages() throws Exception {
 
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri);
@@ -96,9 +109,7 @@ public class AMQ5626Test {
         connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-
         MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
-
         Message message = session.createMessage();
 
         // 0,1
@@ -153,33 +164,12 @@ public class AMQ5626Test {
                 LOG.info("QueueView enqueue count : " + queueView.getEnqueueCount());
                 LOG.info("QueueView dequeue count : " + queueView.getDequeueCount());
                 LOG.info("QueueView inflight count : " + queueView.getInFlightCount());
-
-            }
-        }
-    }
-
-    private QueueView getQueueView(BrokerService broker, String queueName) throws Exception {
-        Map<ObjectName, DestinationView> queueViews = broker.getAdminView().getBroker().getQueueViews();
-
-        for (ObjectName key : queueViews.keySet()) {
-            DestinationView destinationView = queueViews.get(key);
-
-            if (destinationView instanceof QueueView) {
-                QueueView queueView = (QueueView) destinationView;
-
-                if (queueView.getName().equals(queueName)) {
-                    return queueView;
-                }
-
             }
         }
-        return null;
     }
 
     private synchronized void stopRestartBroker() {
-
         try {
-
             LOG.info(">>>SHUTTING BROKER DOWN");
             brokerService.stop();
             brokerService.waitUntilStopped();
@@ -190,22 +180,9 @@ public class AMQ5626Test {
             brokerService.waitUntilStarted();
 
             LOG.info(">>>BROKER RESTARTED..");
-
         } catch (Exception e) {
             LOG.error("FAILED TO STOP/START BROKER EXCEPTION", e);
             fail("FAILED TO STOP/START BROKER" + e);
         }
     }
-
-    @After
-    public void teardown() throws Exception {
-
-        try {
-            brokerService.stop();
-        } catch (Exception ex) {
-            LOG.error("FAILED TO STOP/START BROKER EXCEPTION", ex);
-        }
-
-    }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/13044dec/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index bb56e7d..a39496d 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -16,16 +16,21 @@
  */
 package org.apache.activemq.store.kahadb;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -40,26 +45,23 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
 public class JournalCorruptionEofIndexRecoveryTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(JournalCorruptionEofIndexRecoveryTest.class);
 
-    ActiveMQConnectionFactory cf = null;
-    BrokerService broker = null;
-    private final Destination destination = new ActiveMQQueue("Test");
+    private ActiveMQConnectionFactory cf = null;
+    private BrokerService broker = null;
     private String connectionUri;
     private KahaDBPersistenceAdapter adapter;
 
+    private final Destination destination = new ActiveMQQueue("Test");
+    private final String KAHADB_DIRECTORY = "target/activemq-data/";
+    private final String payload = new String(new byte[1024]);
 
     protected void startBroker() throws Exception {
         doStartBroker(true, false);
     }
 
-
     protected void restartBroker(boolean whackIndex) throws Exception {
         restartBroker(whackIndex, false);
     }
@@ -83,6 +85,8 @@ public class JournalCorruptionEofIndexRecoveryTest {
 
     private void doStartBroker(boolean delete, boolean forceRecoverIndex) throws Exception {
         broker = new BrokerService();
+        broker.setDataDirectory(KAHADB_DIRECTORY);
+
         if (delete) {
             IOHelper.deleteChildren(broker.getPersistenceAdapter().getDirectory());
             IOHelper.delete(broker.getPersistenceAdapter().getDirectory());
@@ -118,7 +122,6 @@ public class JournalCorruptionEofIndexRecoveryTest {
 
         adapter.setPreallocationStrategy("zeros");
         adapter.setPreallocationScope("entire_journal");
-
     }
 
     @After
@@ -129,7 +132,6 @@ public class JournalCorruptionEofIndexRecoveryTest {
         }
     }
 
-
     @Test
     public void testRecoveryAfterCorruptionEof() throws Exception {
         startBroker();
@@ -145,9 +147,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
         restartBroker(false);
 
         assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount());
-
         assertEquals("Drain", 49, drainQueue(49));
-
     }
 
     @Test
@@ -161,9 +161,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
         restartBroker(true);
 
         assertEquals("missing one message", 3, broker.getAdminView().getTotalMessageCount());
-
         assertEquals("Drain", 3, drainQueue(4));
-
     }
 
     @Test
@@ -177,16 +175,13 @@ public class JournalCorruptionEofIndexRecoveryTest {
         restartBroker(false);
 
         assertEquals("unnoticed", 4, broker.getAdminView().getTotalMessageCount());
-
         assertEquals("Drain", 0, drainQueue(4));
 
         // force recover index and loose one message
         restartBroker(false, true);
 
         assertEquals("missing one index recreation", 3, broker.getAdminView().getTotalMessageCount());
-
         assertEquals("Drain", 3, drainQueue(4));
-
     }
 
     @Test
@@ -200,7 +195,6 @@ public class JournalCorruptionEofIndexRecoveryTest {
         restartBroker(false, true);
 
         assertEquals("Drain", numToSend, drainQueue(numToSend));
-
     }
 
     private void corruptBatchCheckSumSplash(int id) throws Exception{
@@ -230,7 +224,6 @@ public class JournalCorruptionEofIndexRecoveryTest {
         randomAccessFile.writeInt(size);
 
         randomAccessFile.getChannel().force(true);
-
     }
 
     private void corruptBatchEndEof(int id) throws Exception{
@@ -246,7 +239,6 @@ public class JournalCorruptionEofIndexRecoveryTest {
         randomAccessFile.writeInt(31 * 1024 * 1024);
         randomAccessFile.writeLong(0l);
         randomAccessFile.getChannel().force(true);
-
     }
 
     private ArrayList<Integer> findBatch(RecoverableRandomAccessFile randomAccessFile, int where) throws IOException {
@@ -269,11 +261,8 @@ public class JournalCorruptionEofIndexRecoveryTest {
         return batchPositions;
     }
 
-
     private int getNumberOfJournalFiles() throws IOException {
-
-        Collection<DataFile> files =
-                ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        Collection<DataFile> files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
         int reality = 0;
         for (DataFile file : files) {
             if (file != null) {
@@ -283,11 +272,9 @@ public class JournalCorruptionEofIndexRecoveryTest {
         return reality;
     }
 
-
     private int produceMessages(Destination destination, int numToSend) throws Exception {
         int sent = 0;
-        Connection connection = new ActiveMQConnectionFactory(
-                broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+        Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
         connection.start();
         try {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -307,8 +294,6 @@ public class JournalCorruptionEofIndexRecoveryTest {
         return produceMessages(destination, numToSend);
     }
 
-    final String payload = new String(new byte[1024]);
-
     private Message createMessage(Session session, int i) throws Exception {
         return session.createTextMessage(payload + "::" + i);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/13044dec/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
index 821dfd9..84c2ab5 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
@@ -16,16 +16,21 @@
  */
 package org.apache.activemq.store.kahadb;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -40,18 +45,16 @@ import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
 @RunWith(Parameterized.class)
 public class JournalCorruptionIndexRecoveryTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(JournalCorruptionIndexRecoveryTest.class);
 
-    ActiveMQConnectionFactory cf = null;
-    BrokerService broker = null;
+    private final String KAHADB_DIRECTORY = "target/activemq-data/";
+    private final String payload = new String(new byte[1024]);
+
+    private ActiveMQConnectionFactory cf = null;
+    private BrokerService broker = null;
     private final Destination destination = new ActiveMQQueue("Test");
     private String connectionUri;
     private KahaDBPersistenceAdapter adapter;
@@ -69,7 +72,6 @@ public class JournalCorruptionIndexRecoveryTest {
         doStartBroker(true);
     }
 
-
     protected void restartBroker() throws Exception {
         File dataDir = broker.getPersistenceAdapter().getDirectory();
 
@@ -83,12 +85,12 @@ public class JournalCorruptionIndexRecoveryTest {
         doStartBroker(false);
     }
 
-
     private void doStartBroker(boolean delete) throws Exception {
         broker = new BrokerService();
         broker.setDeleteAllMessagesOnStartup(delete);
         broker.setPersistent(true);
         broker.setUseJmx(true);
+        broker.setDataDirectory(KAHADB_DIRECTORY);
         broker.addConnector("tcp://localhost:0");
 
         configurePersistence(broker);
@@ -112,7 +114,6 @@ public class JournalCorruptionIndexRecoveryTest {
 
         adapter.setCheckForCorruptJournalFiles(true);
         adapter.setIgnoreMissingJournalfiles(true);
-
     }
 
     @After
@@ -138,11 +139,9 @@ public class JournalCorruptionIndexRecoveryTest {
         restartBroker();
 
         assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount());
-
         assertEquals("Drain", 49, drainQueue(49));
     }
 
-
     @Test
     public void testRecoveryAfterCorruptionEnd() throws Exception {
         startBroker();
@@ -158,9 +157,7 @@ public class JournalCorruptionIndexRecoveryTest {
         restartBroker();
 
         assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount());
-
         assertEquals("Drain", 49, drainQueue(49));
-
     }
 
     @Test
@@ -180,15 +177,12 @@ public class JournalCorruptionIndexRecoveryTest {
 
         assertEquals("missing one message", 48, broker.getAdminView().getTotalMessageCount());
         assertEquals("Drain", 48, drainQueue(48));
-
     }
 
     private void whackIndex(File dataDir) {
-
         File indexToDelete = new File(dataDir, "db.data");
         LOG.info("Whacking index: " + indexToDelete);
         indexToDelete.delete();
-
     }
 
     private void corruptBatchMiddle(int i) throws IOException {
@@ -201,8 +195,7 @@ public class JournalCorruptionIndexRecoveryTest {
 
     private void corruptBatch(int id, boolean atEnd) throws IOException {
 
-        Collection<DataFile> files =
-                ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        Collection<DataFile> files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
         DataFile dataFile = (DataFile) files.toArray()[id];
 
         RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile();
@@ -232,11 +225,8 @@ public class JournalCorruptionIndexRecoveryTest {
         randomAccessFile.write(bla, 0, bla.length);
     }
 
-
     private int getNumberOfJournalFiles() throws IOException {
-
-        Collection<DataFile> files =
-                ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        Collection<DataFile> files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
         int reality = 0;
         for (DataFile file : files) {
             if (file != null) {
@@ -246,11 +236,9 @@ public class JournalCorruptionIndexRecoveryTest {
         return reality;
     }
 
-
     private int produceMessages(Destination destination, int numToSend) throws Exception {
         int sent = 0;
-        Connection connection = new ActiveMQConnectionFactory(
-                broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+        Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
         connection.start();
         try {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -270,8 +258,6 @@ public class JournalCorruptionIndexRecoveryTest {
         return produceMessages(destination, numToSend);
     }
 
-    final String payload = new String(new byte[1024]);
-
     private Message createMessage(Session session, int i) throws Exception {
         return session.createTextMessage(payload + "::" + i);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/13044dec/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
new file mode 100644
index 0000000..5b272db
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KahaDBStoreOpenWireVersionTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KahaDBStoreOpenWireVersionTest.class);
+
+    private final String KAHADB_DIRECTORY_BASE = "./target/activemq-data/";
+    private final int NUM_MESSAGES = 10;
+
+    private BrokerService broker = null;
+    private String storeDir;
+
+    @Rule public TestName name = new TestName();
+
+    protected BrokerService createBroker(int storeOpenWireVersion) throws Exception {
+        broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setAdvisorySupport(false);
+        broker.setDataDirectory(storeDir);
+        broker.setStoreOpenWireVersion(storeOpenWireVersion);
+        broker.start();
+        broker.waitUntilStarted();
+
+        return broker;
+    }
+
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("=============== Starting test {} ================", name.getMethodName());
+        storeDir = KAHADB_DIRECTORY_BASE + name.getMethodName();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        File brokerStoreDir = new File(KAHADB_DIRECTORY_BASE);
+
+        if (broker != null) {
+            brokerStoreDir = broker.getPersistenceAdapter().getDirectory();
+            stopBroker();
+        }
+
+        IOHelper.deleteChildren(brokerStoreDir);
+        IOHelper.delete(brokerStoreDir);
+
+        LOG.info("=============== Finished test {} ================", name.getMethodName());
+    }
+
+    @Test(timeout = 60000)
+    public void testConfiguredVersionWorksOnReload() throws Exception {
+        final int INITIAL_STORE_VERSION = OpenWireFormat.DEFAULT_STORE_VERSION - 1;
+        final int RELOAD_STORE_VERSION = OpenWireFormat.DEFAULT_STORE_VERSION - 1;
+
+        doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION);
+    }
+
+    @Test(timeout = 60000)
+    public void testOlderVersionWorksWithDefaults() throws Exception {
+        final int INITIAL_STORE_VERSION = OpenWireFormat.DEFAULT_LEGACY_VERSION;
+        final int RELOAD_STORE_VERSION = OpenWireFormat.DEFAULT_STORE_VERSION;
+
+        doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION);
+    }
+
+    @Test(timeout = 60000)
+    public void testNewerVersionWorksWhenOlderIsConfigured() throws Exception {
+        final int INITIAL_STORE_VERSION = OpenWireFormat.DEFAULT_STORE_VERSION;
+        final int RELOAD_STORE_VERSION = OpenWireFormat.DEFAULT_LEGACY_VERSION;
+
+        doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION);
+    }
+
+    private void doTestStoreVersionConfigrationOverrides(int create, int reload) throws Exception {
+        createBroker(create);
+        populateStore();
+        stopBroker();
+
+        createBroker(reload);
+        assertEquals(create, broker.getStoreOpenWireVersion());
+        assertStoreIsUsable();
+    }
+
+    private void populateStore() throws Exception {
+
+        ConnectionFactory factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+        Connection connection = factory.createConnection();
+        connection.setClientID("test");
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("test.topic");
+        Queue queue = session.createQueue("test.queue");
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "test");
+        consumer.close();
+
+        MessageProducer producer = session.createProducer(topic);
+        producer.setPriority(9);
+        for (int i = 0; i < NUM_MESSAGES; i++) {
+            Message msg = session.createTextMessage("test message:" + i);
+            producer.send(msg);
+        }
+        LOG.info("sent {} to topic", NUM_MESSAGES);
+
+        producer = session.createProducer(queue);
+        for (int i = 0; i < NUM_MESSAGES; i++) {
+            Message msg = session.createTextMessage("test message:" + i);
+            producer.send(msg);
+        }
+        LOG.info("sent {} to topic", NUM_MESSAGES);
+
+        connection.close();
+    }
+
+    private void assertStoreIsUsable() throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        connection.setClientID("test");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("test.topic");
+        Queue queue = session.createQueue("test.queue");
+
+        MessageConsumer queueConsumer = session.createConsumer(queue);
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            TextMessage received = (TextMessage) queueConsumer.receive(1000);
+            assertNotNull(received);
+        }
+        LOG.info("Consumed {} from queue", NUM_MESSAGES);
+
+        MessageConsumer topicConsumer = session.createDurableSubscriber(topic, "test");
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            TextMessage received = (TextMessage) topicConsumer.receive(1000);
+            assertNotNull(received);
+        }
+        LOG.info("Consumed {} from topic", NUM_MESSAGES);
+
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/13044dec/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
index 919c5da..a25b094 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class InactivityMonitorTest extends CombinationTestSupport implements TransportAcceptListener {
+
     private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitorTest.class);
 
     public Runnable serverRunOnCommand;
@@ -54,6 +55,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
     private final AtomicBoolean ignoreClientError = new AtomicBoolean(false);
     private final AtomicBoolean ignoreServerError = new AtomicBoolean(false);
 
+    @Override
     protected void setUp() throws Exception {
         super.setUp();
         startTransportServer();
@@ -66,6 +68,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
     private void startClient() throws Exception, URISyntaxException {
         clientTransport = TransportFactory.connect(new URI("tcp://localhost:" + serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000"));
         clientTransport.setTransportListener(new TransportListener() {
+            @Override
             public void onCommand(Object command) {
                 clientReceiveCount.incrementAndGet();
                 if (clientRunOnCommand != null) {
@@ -73,6 +76,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
                 }
             }
 
+            @Override
             public void onException(IOException error) {
                 if (!ignoreClientError.get()) {
                     LOG.info("Client transport error:");
@@ -81,12 +85,15 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
                 }
             }
 
+            @Override
             public void transportInterupted() {
             }
 
+            @Override
             public void transportResumed() {
             }
         });
+
         clientTransport.start();
     }
 
@@ -103,6 +110,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
         serverPort = server.getSocketAddress().getPort();
     }
 
+    @Override
     protected void tearDown() throws Exception {
         ignoreClientError.set(true);
         ignoreServerError.set(true);
@@ -122,11 +130,13 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
         super.tearDown();
     }
 
+    @Override
     public void onAccept(Transport transport) {
         try {
             LOG.info("[" + getName() + "] Server Accepted a Connection");
             serverTransport = transport;
             serverTransport.setTransportListener(new TransportListener() {
+                @Override
                 public void onCommand(Object command) {
                     serverReceiveCount.incrementAndGet();
                     if (serverRunOnCommand != null) {
@@ -134,6 +144,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
                     }
                 }
 
+                @Override
                 public void onException(IOException error) {
                     if (!ignoreClientError.get()) {
                         LOG.info("Server transport error:", error);
@@ -141,9 +152,11 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
                     }
                 }
 
+                @Override
                 public void transportInterupted() {
                 }
 
+                @Override
                 public void transportResumed() {
                 }
             });
@@ -153,18 +166,17 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
         }
     }
 
+    @Override
     public void onAcceptError(Exception error) {
         LOG.trace(error.toString());
     }
 
     public void testClientHang() throws Exception {
-
-        //
         // Manually create a client transport so that it does not send KeepAlive
-        // packets.
-        // this should simulate a client hang.
+        // packets.  this should simulate a client hang.
         clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null);
         clientTransport.setTransportListener(new TransportListener() {
+            @Override
             public void onCommand(Object command) {
                 clientReceiveCount.incrementAndGet();
                 if (clientRunOnCommand != null) {
@@ -172,6 +184,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
                 }
             }
 
+            @Override
             public void onException(IOException error) {
                 if (!ignoreClientError.get()) {
                     LOG.info("Client transport error:");
@@ -180,15 +193,18 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
                 }
             }
 
+            @Override
             public void transportInterupted() {
             }
 
+            @Override
             public void transportResumed() {
             }
         });
+
         clientTransport.start();
         WireFormatInfo info = new WireFormatInfo();
-        info.setVersion(OpenWireFormat.DEFAULT_VERSION);
+        info.setVersion(OpenWireFormat.DEFAULT_LEGACY_VERSION);
         info.setMaxInactivityDuration(1000);
         clientTransport.oneway(info);
 
@@ -223,12 +239,12 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
      * @throws URISyntaxException
      */
     public void initCombosForTestNoClientHangWithServerBlock() throws Exception {
-
         startClient();
 
         addCombinationValues("clientInactivityLimit", new Object[] {Long.valueOf(1000)});
         addCombinationValues("serverInactivityLimit", new Object[] {Long.valueOf(1000)});
         addCombinationValues("serverRunOnCommand", new Object[] {new Runnable() {
+            @Override
             public void run() {
                 try {
                     LOG.info("Sleeping");
@@ -251,5 +267,4 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
         assertEquals(0, clientErrorCount.get());
         assertEquals(0, serverErrorCount.get());
     }
-
 }


Mime
View raw message