bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [34/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
Date Wed, 16 Mar 2016 03:44:44 GMT
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
deleted file mode 100644
index a927279..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
+++ /dev/null
@@ -1,628 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.Test;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-// see https://issues.apache.org/activemq/browse/AMQ-2985
-// this demonstrated receiving old messages eventually along with validating order receipt
-public class DurableSubProcessTest extends org.apache.activemq.TestSupport  {
-    private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessTest.class);
-    // public static final long RUNTIME = 4 * 60 * 1000;
-    public static final long RUNTIME = 60 * 1000;
-
-    public static final int SERVER_SLEEP = 2 * 1000; // max
-    public static final int CARGO_SIZE = 10; // max
-
-    public static final int MAX_CLIENTS = 7;
-    public static final Random CLIENT_LIFETIME = new Random(30 * 1000, 2 * 60 * 1000);
-    public static final Random CLIENT_ONLINE = new Random(2 * 1000, 15 * 1000);
-    public static final Random CLIENT_OFFLINE = new Random(1 * 1000, 20 * 1000);
-
-    public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true;
-
-
-    private Topic topic;
-
-    private ClientManager clientManager;
-    private Server server;
-    private HouseKeeper houseKeeper;
-
-    static final Vector<Throwable> exceptions = new Vector<Throwable>();
-
-    public void testProcess() {
-        try {
-            server.start();
-            clientManager.start();
-
-            if (ALLOW_SUBSCRIPTION_ABANDONMENT)
-                houseKeeper.start();
-
-            Thread.sleep(RUNTIME);
-            assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
-        }
-        catch (Throwable e) {
-            exit("DurableSubProcessTest.testProcess failed.", e);
-        }
-        LOG.info("DONE.");
-    }
-
-    /**
-     * Creates batch of messages in a transaction periodically.
-     * The last message in the transaction is always a special
-     * message what contains info about the whole transaction.
-     * <p>Notifies the clients about the created messages also.
-     */
-    final class Server extends Thread {
-
-        final ConnectionFactory cf = new HedwigConnectionFactoryImpl();
-
-        final Object sendMutex = new Object();
-        final String[] cargos = new String[500];
-
-        int transRover = 0;
-        int messageRover = 0;
-
-        public Server() {
-            super("Server");
-            setDaemon(true);
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    DurableSubProcessTest.sleepRandom(SERVER_SLEEP);
-                    send();
-                }
-            }
-            catch (Throwable e) {
-                exit("Server.run failed", e);
-            }
-        }
-
-        public void send() throws JMSException {
-            // do not create new clients now
-            // ToDo: Test this case later.
-            synchronized (sendMutex) {
-                int trans = ++transRover;
-                boolean relevantTrans = random(2) > 1;
-                ClientType clientType = relevantTrans ? ClientType.randomClientType() : null; // sends this types
-                int count = random(200);
-
-                LOG.info("Sending Trans[id=" + trans + ", count=" + count + ", clientType=" + clientType + "]");
-
-                Connection con = cf.createConnection();
-                Session sess = con.createSession(true, Session.AUTO_ACKNOWLEDGE);
-                MessageProducer prod = sess.createProducer(null);
-
-                for (int i = 0; i < count; i++) {
-                    Message message = sess.createMessage();
-                    message.setIntProperty("ID", ++messageRover);
-                    String type = clientType != null ? clientType.randomMessageType() :
-                        ClientType.randomNonRelevantMessageType();
-                    message.setStringProperty("TYPE", type);
-
-                    if (CARGO_SIZE > 0)
-                        message.setStringProperty("CARGO", getCargo(CARGO_SIZE));
-
-                    prod.send(topic, message);
-                    clientManager.onServerMessage(message);
-                }
-
-                Message message = sess.createMessage();
-                message.setIntProperty("ID", ++messageRover);
-                message.setIntProperty("TRANS", trans);
-                message.setBooleanProperty("COMMIT", true);
-                message.setBooleanProperty("RELEVANT", relevantTrans);
-                prod.send(topic, message);
-                clientManager.onServerMessage(message);
-
-                sess.commit();
-                sess.close();
-                con.close();
-            }
-        }
-
-        private String getCargo(int length) {
-            if (length == 0)
-                return null;
-
-            if (length < cargos.length) {
-                String result = cargos[length];
-                if (result == null) {
-                    result = getCargoImpl(length);
-                    cargos[length] = result;
-                }
-                return result;
-            }
-            return getCargoImpl(length);
-        }
-
-        private String getCargoImpl(int length) {
-            StringBuilder sb = new StringBuilder(length);
-            for (int i = length; --i >=0; ) {
-                sb.append('a');
-            }
-            return sb.toString();
-        }
-    }
-
-    /**
-     * Clients listen on different messages in the topic.
-     * The 'TYPE' property helps the client to select the
-     * proper messages.
-     */
-    private enum ClientType {
-        A ("a", "b", "c"),
-        B ("c", "d", "e"),
-        C ("d", "e", "f"),
-        D ("g", "h");
-
-        public final String[] messageTypes;
-        public final HashSet<String> messageTypeSet;
-        public final String selector;
-
-        ClientType(String... messageTypes) {
-            this.messageTypes = messageTypes;
-            messageTypeSet = new HashSet<String>(Arrays.asList(messageTypes));
-
-            StringBuilder sb = new StringBuilder("TYPE in (");
-            for (int i = 0; i < messageTypes.length; i++) {
-                if (i > 0)
-                    sb.append(", ");
-                sb.append('\'').append(messageTypes[i]).append('\'');
-            }
-            sb.append(')');
-            selector = sb.toString();
-        }
-
-        public static ClientType randomClientType() {
-            return values()[DurableSubProcessTest.random(values().length - 1)];
-        }
-
-        public final String randomMessageType() {
-            return messageTypes[DurableSubProcessTest.random(messageTypes.length - 1)];
-        }
-
-        public static String randomNonRelevantMessageType() {
-            return Integer.toString(DurableSubProcessTest.random(20));
-        }
-
-        public final boolean isRelevant(String messageType) {
-            return messageTypeSet.contains(messageType);
-        }
-
-        @Override
-        public final String toString() {
-            return this.name() /*+ '[' + selector + ']'*/;
-        }
-    }
-
-    /**
-     * Creates new cliens.
-     */
-    private final class ClientManager extends Thread {
-
-        private int clientRover = 0;
-
-        private final CopyOnWriteArrayList<Client> clients = new CopyOnWriteArrayList<Client>();
-
-        public ClientManager() {
-            super("ClientManager");
-            setDaemon(true);
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    if (clients.size() < MAX_CLIENTS)
-                        createNewClient();
-
-                    int size = clients.size();
-                    sleepRandom(size * 3 * 1000, size * 6 * 1000);
-                }
-            }
-            catch (Throwable e) {
-                exit("ClientManager.run failed.", e);
-            }
-        }
-
-        private void createNewClient() throws JMSException {
-            ClientType type = ClientType.randomClientType();
-
-            Client client;
-            synchronized (server.sendMutex) {
-                client = new Client(++clientRover, type, CLIENT_LIFETIME, CLIENT_ONLINE, CLIENT_OFFLINE);
-                clients.add(client);
-            }
-            client.start();
-
-            LOG.info(client.toString() + " created. " + this);
-        }
-
-        public void removeClient(Client client) {
-            clients.remove(client);
-        }
-
-        public void onServerMessage(Message message) throws JMSException {
-            for (Client client: clients) {
-                client.onServerMessage(message);
-            }
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder("ClientManager[count=");
-            sb.append(clients.size());
-            sb.append(", clients=");
-            boolean sep = false;
-            for (Client client: clients) {
-                if (sep) sb.append(", ");
-                else sep = true;
-                sb.append(client.toString());
-            }
-            sb.append(']');
-            return sb.toString();
-        }
-    }
-
-    /**
-     * Consumes massages from a durable subscription.
-     * Goes online/offline periodically. Checks the incoming messages
-     * against the sent messages of the server.
-     */
-    private final class Client extends Thread {
-
-        final ConnectionFactory cf = new HedwigConnectionFactoryImpl();
-
-        public static final String SUBSCRIPTION_NAME = "subscription";
-
-        private final int id;
-        private final String conClientId;
-
-        private final Random lifetime;
-        private final Random online;
-        private final Random offline;
-        private int numIter = 0;
-
-        private final ClientType clientType;
-        private final String selector;
-
-        private final ConcurrentLinkedQueue<Message> waitingList = new ConcurrentLinkedQueue<Message>();
-
-        public Client(int id, ClientType clientType, Random lifetime, Random online, Random offline)
-                throws JMSException {
-            super("Client" + id);
-            setDaemon(true);
-
-            this.id = id;
-            conClientId = "cli" + id;
-            this.clientType = clientType;
-            selector = "(COMMIT = true and RELEVANT = true) or " + clientType.selector;
-
-            this.lifetime = lifetime;
-            this.online = online;
-            this.offline = offline;
-
-            subscribe();
-        }
-
-        @Override
-        public void run() {
-            long end = System.currentTimeMillis() + lifetime.next();
-            try {
-                boolean sleep = false;
-                while (true) {
-                    long max = end - System.currentTimeMillis();
-                    if (max <= 0)
-                        break;
-
-                    if (sleep) offline.sleepRandom();
-                    else sleep = true;
-
-                    numIter ++;
-                    process(online.next());
-                }
-
-                if (!ALLOW_SUBSCRIPTION_ABANDONMENT || random(1) > 0)
-                    unsubscribe();
-                else {
-                    LOG.info("Client abandon the subscription. " + this);
-
-                    // housekeeper should sweep these abandoned subscriptions
-                    houseKeeper.abandonedSubscriptions.add(conClientId);
-                }
-            }
-            catch (Throwable e) {
-                exit(toString() + " failed.", e);
-            }
-
-            clientManager.removeClient(this);
-            LOG.info(toString() + " DONE.");
-        }
-
-        private void process(long millis) throws JMSException {
-            long end = System.currentTimeMillis() + millis;
-            long hardEnd = end + 2000; // wait to finish the transaction.
-            boolean inTransaction = false;
-            int transCount = 0;
-
-            LOG.info(toString() + " ONLINE.");
-            Connection con = openConnection();
-            Session sess = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-            MessageConsumer consumer = sess.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, false);
-            try {
-                do {
-                    long max = end - System.currentTimeMillis();
-                    if (max <= 0) {
-                        if (!inTransaction)
-                            break;
-
-                        max = hardEnd - System.currentTimeMillis();
-                        if (max <= 0)
-                            exit("" + this + " failed: Transaction is not finished.");
-                    }
-
-                    Message message = consumer.receive(max);
-                    if (message == null)
-                        continue;
-
-                    onClientMessage(message);
-
-                    if (message.propertyExists("COMMIT")) {
-                        message.acknowledge();
-
-                        LOG.info("Received Trans[id=" + message.getIntProperty("TRANS")
-                                 + ", count=" + transCount + "] in " + this + ".");
-
-                        inTransaction = false;
-                        transCount = 0;
-                    }
-                    else {
-                        inTransaction = true;
-                        transCount++;
-                    }
-                } while (true);
-            }
-            finally {
-                sess.close();
-                con.close();
-
-                LOG.info(toString() + " OFFLINE.");
-
-                // Check if the messages are in the waiting
-                // list for long time.
-                Message topMessage = waitingList.peek();
-                if (topMessage != null)
-                    checkDeliveryTime(topMessage);
-            }
-        }
-
-        public void onServerMessage(Message message) throws JMSException {
-            if (Boolean.TRUE.equals(message.getObjectProperty("COMMIT"))) {
-                if (Boolean.TRUE.equals(message.getObjectProperty("RELEVANT")))
-                    waitingList.add(message);
-            }
-            else {
-                String messageType = message.getStringProperty("TYPE");
-                if (clientType.isRelevant(messageType))
-                    waitingList.add(message);
-            }
-        }
-
-        public void onClientMessage(Message message) {
-            Message serverMessage = waitingList.poll();
-            try {
-                if (serverMessage == null)
-                    exit("" + this + " failed: There is no next server message, but received: " + message);
-
-                Integer receivedId = (Integer) message.getObjectProperty("ID");
-                Integer serverId = (Integer) serverMessage.getObjectProperty("ID");
-                if (receivedId == null || serverId == null)
-                    exit("" + this + " failed: message ID not found.\r\n" +
-                            " received: " + message + "\r\n" +
-                            "   server: " + serverMessage);
-
-                if (!serverId.equals(receivedId))
-                    exit("" + this + " failed: Received wrong message.\r\n" +
-                            " received: " + message + "\r\n" +
-                            "   server: " + serverMessage);
-
-                checkDeliveryTime(message);
-            }
-            catch (Throwable e) {
-                exit("" + this + ".onClientMessage failed.\r\n" +
-                        " received: " + message + "\r\n" +
-                        "   server: " + serverMessage, e);
-            }
-        }
-
-        /**
-         * Checks if the message was not delivered fast enough.
-         */
-        public void checkDeliveryTime(Message message) throws JMSException {
-            long creation = message.getJMSTimestamp();
-            // + 1000 for the various additional setup times that activemq supports (if I am not wrong !)
-            long min = System.currentTimeMillis() - numIter * ((offline.max + online.min) + 1000);
-
-            if (min > creation) {
-                SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
-                exit("" + this + ".checkDeliveryTime failed. Message time: " + df.format(new Date(creation))
-                     + ", min: " + df.format(new Date(min)) + "\r\n" + message);
-            }
-        }
-
-        private Connection openConnection() throws JMSException {
-            Connection con = cf.createConnection();
-            con.setClientID(conClientId);
-            con.start();
-            return con;
-        }
-
-        private void subscribe() throws JMSException {
-            Connection con = openConnection();
-            Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            session.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, true);
-            session.close();
-            con.close();
-        }
-
-        private void unsubscribe() throws JMSException {
-            try {
-                Connection con = openConnection();
-                Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                session.unsubscribe(SUBSCRIPTION_NAME);
-                session.close();
-                con.close();
-            } catch (JMSException jsEx){ // ignore
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "Client[id=" + id + ", type=" + clientType + "]";
-        }
-    }
-
-    /**
-     * Sweeps out not-used durable subscriptions.
-     */
-    private final class HouseKeeper extends Thread {
-
-        private HouseKeeper() {
-            super("HouseKeeper");
-            setDaemon(true);
-        }
-
-        public final CopyOnWriteArrayList<String> abandonedSubscriptions = new CopyOnWriteArrayList<String>();
-
-        @Override
-        public void run() {
-            while (true) {
-                try {
-                    Thread.sleep(60 * 1000);
-                    sweep();
-                }
-                catch (InterruptedException ex) {
-                    break;
-                }
-                catch (Throwable e) {
-                    Exception log = new Exception("HouseKeeper failed.", e);
-                    log.printStackTrace();
-                }
-            }
-        }
-
-        private void sweep() throws Exception {
-            LOG.info("Housekeeper sweeping.");
-
-            int closed = 0;
-            ArrayList<String> sweeped = new ArrayList<String>();
-            try {
-                for (String clientId: abandonedSubscriptions) {
-                    sweeped.add(clientId);
-                    LOG.info("Sweeping out subscription of " + clientId + ".");
-                    closed++;
-                }
-            }
-            finally {
-                abandonedSubscriptions.removeAll(sweeped);
-            }
-
-            LOG.info("Housekeeper sweeped out " + closed + " subscriptions.");
-        }
-    }
-
-    public static int random(int max) {
-        return (int) (Math.random() * (max + 1));
-    }
-
-    public static int random(int min, int max) {
-        return random(max - min) + min;
-    }
-
-    public static void sleepRandom(int maxMillis) throws InterruptedException {
-        Thread.sleep(random(maxMillis));
-    }
-
-    public static void sleepRandom(int minMillis, int maxMillis) throws InterruptedException {
-        Thread.sleep(random(minMillis, maxMillis));
-    }
-
-    public static final class Random {
-
-        final int min;
-        final int max;
-
-        Random(int min, int max) {
-            this.min = min;
-            this.max = max;
-        }
-
-        public int next() {
-            return random(min, max);
-        }
-
-        public void sleepRandom() throws InterruptedException {
-            DurableSubProcessTest.sleepRandom(min, max);
-        }
-    }
-
-    public static void exit(String message) {
-        exit(message, null);
-    }
-
-    public static void exit(String message, Throwable e) {
-        Throwable log = new RuntimeException(message, e);
-        log.printStackTrace();
-        LOG.error(message, e);
-        exceptions.add(e);
-        fail(message);
-    }
-
-    protected void setUp() throws Exception {
-        topic = (Topic) createDestination();
-
-        clientManager = new ClientManager();
-        server = new Server();
-        houseKeeper = new HouseKeeper();
-
-        super.setUp();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayTest.java
deleted file mode 100644
index b8d12da..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR ONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DurableSubSelectorDelayTest extends JmsTestBase {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DurableSubSelectorDelayTest.class);
-
-    public static final long RUNTIME = 3 * 60 * 1000;
-
-    private Topic topic;
-
-    public void testProcess() throws Exception {
-
-        DurableSubscriber subscribers[] = new DurableSubscriber[10];
-
-        for (int i = 0; i < subscribers.length; i++) {
-            subscribers[i] = new DurableSubscriber(i);
-        }
-
-        MsgProducer msgProducer = new MsgProducer();
-        msgProducer.start();
-
-        for (int i = 0; i < subscribers.length; i++) {
-            subscribers[i].process();
-        }
-
-
-        // wait for server to finish
-        msgProducer.join();
-
-        for (int j = 0; j < subscribers.length; j++) {
-            LOG.info("Unsubscribing subscriber " + subscribers[j]);
-            subscribers[j].unsubscribe();
-        }
-
-        // allow the clean up thread time to run
-        TimeUnit.MINUTES.sleep(2);
-        LOG.info("DONE.");
-    }
-
-    /**
-     * Message Producer
-     */
-    final class MsgProducer extends Thread {
-
-        final ConnectionFactory cf = new HedwigConnectionFactoryImpl();
-
-        int transRover = 0;
-        int messageRover = 0;
-
-        public MsgProducer() {
-            super("MsgProducer");
-            setDaemon(true);
-        }
-
-        @Override
-        public void run() {
-            long endTime = RUNTIME + System.currentTimeMillis();
-
-            try {
-                while (endTime > System.currentTimeMillis()) {
-                    Thread.sleep(400);
-                    send();
-                }
-            } catch (Throwable e) {
-               e.printStackTrace(System.out);
-               throw new RuntimeException(e);
-            }
-        }
-
-        public void send() throws JMSException {
-
-            int trans = ++transRover;
-            boolean relevantTrans = true;
-            int count = 40;
-
-            LOG.info("Sending Trans[id=" + trans + ", count="
-                    + count + "]");
-
-            Connection con = cf.createConnection();
-
-            Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-            MessageProducer prod = sess.createProducer(null);
-
-            for (int i = 0; i < count; i++) {
-                Message message = sess.createMessage();
-                message.setIntProperty("ID", ++messageRover);
-                message.setIntProperty("TRANS", trans);
-                message.setBooleanProperty("RELEVANT", false);
-                prod.send(topic, message);
-            }
-
-            Message message = sess.createMessage();
-            message.setIntProperty("ID", ++messageRover);
-            message.setIntProperty("TRANS", trans);
-            message.setBooleanProperty("COMMIT", true);
-            message.setBooleanProperty("RELEVANT", relevantTrans);
-            prod.send(topic, message);
-
-            LOG.info("Committed Trans[id=" + trans + ", count="
-                    + count + "], ID=" + messageRover);
-
-            sess.close();
-            con.close();
-        }
-    }
-
-    /**
-     * Consumes massages from a durable subscription. Goes online/offline
-     * periodically. Checks the incoming messages against the sent messages of
-     * the server.
-     */
-    private final class DurableSubscriber {
-
-        final ConnectionFactory cf = new HedwigConnectionFactoryImpl();
-
-        private final String subName ;
-
-        private final int id;
-        private final String conClientId;
-        private final String selector;
-
-        private final Session sess;
-        private final MessageConsumer consumer;
-        private final Connection con;
-
-        public DurableSubscriber(int id) throws JMSException {
-            this.id = id;
-            conClientId = "cli" + id;
-            subName = "subscription"+ id;
-            selector ="RELEVANT = true";
-            con = openConnection();
-            sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            consumer = sess.createDurableSubscriber(topic, subName, selector, false);
-
-        }
-
-        private void process() throws JMSException {
-            long end = System.currentTimeMillis() + 20000;
-            int transCount = 0;
-
-            LOG.info(toString() + " ONLINE.");
-
-            try {
-
-                do {
-                    long max = end - System.currentTimeMillis();
-
-                    if (max <= 0) {
-                            break;
-                    }
-
-                    Message message = consumer.receive(max);
-                    if (message == null) {
-                        continue;
-                    }
-
-                    LOG.info("Received Trans[id="
-                            + message.getIntProperty("TRANS") + ", count="
-                            + transCount + "] in " + this + ".");
-
-                } while (true);
-
-            } finally {
-                sess.close();
-                con.close();
-
-                LOG.info(toString() + " OFFLINE.");
-            }
-        }
-
-        private Connection openConnection() throws JMSException {
-            Connection con = cf.createConnection();
-            con.setClientID(conClientId);
-            con.start();
-            return con;
-        }
-
-        private void unsubscribe() throws JMSException {
-            Connection con = openConnection();
-            Session session = con
-                    .createSession(false, Session.AUTO_ACKNOWLEDGE);
-            // Call a dummp createDurableSubscriber (o.a.h.jms.package-info.html for more on why).
-            session.createDurableSubscriber(topic, subName, selector, false);
-            session.unsubscribe(subName);
-            session.close();
-            con.close();
-        }
-
-        @Override
-        public String toString() {
-            return "DurableSubscriber[id=" + id + "]";
-        }
-    }
-
-    public void setUp() throws Exception {
-        super.setUp();
-        topic = SessionImpl.asTopic("TopicT");
-    }
-
-    public String getName() {
-        return "DurableSubSelectorDelayTest";
-    }
-
-    private static boolean delete(File path) {
-        if (path == null)
-            return true;
-
-        if (path.isDirectory()) {
-            for (File file : path.listFiles()) {
-                delete(file);
-            }
-        }
-        return path.delete();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
deleted file mode 100644
index 0a9968b..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import java.util.concurrent.TimeUnit;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-
-
-import org.apache.commons.lang.RandomStringUtils;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-import static org.junit.Assert.assertNotNull;
-
-public class DurableSubscriptionHangTestCase extends JmsTestBase {
-    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionHangTestCase.class);
-    final static String clientID = "myId";
-    private static final String topicName = "myTopic";
-    private static final String durableSubName = "mySub";
-
-    public void testHanging() throws Exception
-    {
-        registerDurableSubscription();
-        produceExpiredAndOneNonExpiredMessages(1000);
-        TimeUnit.SECONDS.sleep(10); // make sure messages are expired
-        int numMessages = collectMessagesFromDurableSubscriptionForOneMinute();
-        assert 1 == numMessages : "Expected " + 1 + ", received " + numMessages;
-    }
-
-    private void produceExpiredAndOneNonExpiredMessages(final int messageCount) throws JMSException {
-        HedwigConnectionFactoryImpl connectionFactory = new HedwigConnectionFactoryImpl();
-        TopicConnection connection = connectionFactory.createTopicConnection();
-        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic topic = session.createTopic(topicName);
-        MessageProducer producer = session.createProducer(topic);
-        producer.setTimeToLive(TimeUnit.SECONDS.toMillis(1));
-        for(int i=0; i<messageCount; i++) {
-            sendRandomMessage(session, producer);
-        }
-        producer.setTimeToLive(TimeUnit.DAYS.toMillis(1));
-        sendRandomMessage(session, producer);
-        connection.close();
-        LOG.info("produceExpiredAndOneNonExpiredMessages done");
-    }
-
-    private void registerDurableSubscription() throws JMSException
-    {
-        HedwigConnectionFactoryImpl connectionFactory = new HedwigConnectionFactoryImpl();
-        TopicConnection connection = connectionFactory.createTopicConnection();
-        connection.setClientID(clientID);
-        TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic topic = topicSession.createTopic(topicName);
-        TopicSubscriber durableSubscriber = topicSession.createDurableSubscriber(topic, durableSubName);
-        connection.start();
-        durableSubscriber.close();
-        connection.close();
-        LOG.info("Durable Sub Registered");
-    }
-
-    private int collectMessagesFromDurableSubscriptionForOneMinute() throws Exception
-    {
-        HedwigConnectionFactoryImpl connectionFactory = new HedwigConnectionFactoryImpl();
-        TopicConnection connection = connectionFactory.createTopicConnection();
-
-        connection.setClientID(clientID);
-        TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic topic = topicSession.createTopic(topicName);
-        connection.start();
-        TopicSubscriber subscriber = topicSession.createDurableSubscriber(topic, durableSubName);
-        LOG.info("About to receive messages");
-        int retval = 0;
-        while (true){
-            Message message = subscriber.receive(2000);
-            if (null == message) {
-                break;
-            }
-            retval ++;
-        }
-        subscriber.close();
-        connection.close();
-        LOG.info("collectMessagesFromDurableSubscriptionForOneMinute done");
-
-        return retval;
-    }
-
-    private void sendRandomMessage(TopicSession session, MessageProducer producer) throws JMSException {
-        TextMessage textMessage = session.createTextMessage();
-        textMessage.setText(RandomStringUtils.random(500, "abcdefghijklmnopqrstuvwxyz"));
-        producer.send(textMessage);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/MyObject.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/MyObject.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/MyObject.java
deleted file mode 100644
index edf180b..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/MyObject.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.usecases;
-
-import java.io.Serializable;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.IOException;
-import java.io.ObjectStreamException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class MyObject implements Serializable {
-
-    private String message;
-    private AtomicInteger writeObjectCalled = new AtomicInteger(0);
-    private AtomicInteger readObjectCalled = new AtomicInteger(0);
-    private AtomicInteger readObjectNoDataCalled = new AtomicInteger(0);
-
-    public MyObject(String message) {
-        this.setMessage(message);
-    }
-
-    public void setMessage(String message) {
-        this.message = message;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-
-    private void writeObject(java.io.ObjectOutputStream out) throws IOException {
-        writeObjectCalled.incrementAndGet();
-        out.defaultWriteObject();
-    }
-
-    private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
-        in.defaultReadObject();
-        readObjectCalled.incrementAndGet();
-    }
-
-    private void readObjectNoData() throws ObjectStreamException {
-        readObjectNoDataCalled.incrementAndGet();
-    }
-
-    public int getWriteObjectCalled() {
-        return writeObjectCalled.get();
-    }
-
-    public int getReadObjectCalled() {
-        return readObjectCalled.get();
-    }
-
-    public int getReadObjectNoDataCalled() {
-        return readObjectNoDataCalled.get();
-    }
-}
-
-

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java
deleted file mode 100644
index adfb47d..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.spi.HedwigConnectionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NonBlockingConsumerRedeliveryTest extends JmsTestBase {
-    private static final Logger LOG = LoggerFactory.getLogger(NonBlockingConsumerRedeliveryTest.class);
-
-    private final String destinationName = "Destination";
-    // private final int MSG_COUNT = 100;
-
-    private HedwigConnectionFactoryImpl connectionFactory;
-
-    public void testMessageDeleiveredWhenNonBlockingEnabled() throws Exception {
-
-        final List<String> received = new ArrayList<String>(16);
-        final List<String> beforeRollback = new ArrayList<String>(16);
-        final List<String> afterRollback = new ArrayList<String>(16);
-
-        HedwigConnectionImpl connection = connectionFactory.createConnection();
-        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        Destination destination = session.createTopic(destinationName);
-        MessageConsumer consumer = session.createConsumer(destination);
-
-        consumer.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-                try {
-                    received.add(((TextMessage) message).getText());
-                } catch (JMSException e) {
-                    // should not happen ...
-                    e.printStackTrace();
-                }
-            }
-        });
-
-        final int MSG_COUNT = connection.getHedwigClientConfig().getMaximumOutstandingMessages() - 1;
-
-        sendMessages(MSG_COUNT);
-
-        session.commit();
-        connection.start();
-
-
-        Wait.waitFor(new Wait.Condition() {
-            public boolean isSatisified() throws Exception {
-                LOG.info("Consumer has received " + received.size() + " messages.");
-                return received.size() == MSG_COUNT;
-            }
-        });
-
-        assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages, got "
-                   + received.size() + ".",MSG_COUNT == received.size());
-
-        beforeRollback.addAll(received);
-        received.clear();
-        session.rollback();
-
-        assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
-            Wait.waitFor(new Wait.Condition(){
-                public boolean isSatisified() throws Exception {
-                    LOG.info("Consumer has received " + received.size() + " messages since rollback.");
-                    return received.size() == MSG_COUNT;
-                }
-            }
-        ));
-
-        afterRollback.addAll(received);
-        received.clear();
-
-        assertEquals(beforeRollback.size(), afterRollback.size());
-        assertEquals(beforeRollback, afterRollback);
-        session.commit();
-    }
-
-    public void testMessageDeleiveryDoesntStop() throws Exception {
-
-        final List<String> received = Collections.synchronizedList(new ArrayList<String>(16));
-        final List<String> beforeRollback = new ArrayList<String>(16);
-        final List<String> afterRollback = new ArrayList<String>(16);
-
-        HedwigConnectionImpl connection = connectionFactory.createConnection();
-        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        Destination destination = session.createTopic(destinationName);
-        MessageConsumer consumer = session.createConsumer(destination);
-
-        consumer.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-                try {
-                    received.add(((TextMessage) message).getText());
-                } catch (JMSException e) {
-                    // should not happen
-                    e.printStackTrace();
-                }
-            }
-        });
-
-        final int MSG_COUNT = connection.getHedwigClientConfig().getMaximumOutstandingMessages() / 2 - 1;
-
-        sendMessages(MSG_COUNT);
-        connection.start();
-
-        assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
-            Wait.waitFor(new Wait.Condition(){
-                public boolean isSatisified() throws Exception {
-                    LOG.info("Consumer has received " + received.size() + " messages.");
-                    return received.size() == MSG_COUNT;
-                }
-            }
-        ));
-
-        beforeRollback.addAll(received);
-        received.clear();
-        session.rollback();
-
-        sendMessages(MSG_COUNT);
-
-        {
-            boolean messagesReceived = Wait.waitFor(new Wait.Condition(){
-                public boolean isSatisified() throws Exception {
-                    LOG.info("Consumer has received " + received.size() + " messages since rollback.");
-                    return received.size() == MSG_COUNT * 2;
-                }
-            });
-            assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages, received "
-                       + received.size() + " messages.", messagesReceived);
-        }
-
-        afterRollback.addAll(received);
-        received.clear();
-
-        assertEquals(beforeRollback.size() * 2, afterRollback.size());
-
-        session.commit();
-    }
-
-    public void testNonBlockingMessageDeleiveryIsDelayed() throws Exception {
-        final List<String> received = new ArrayList<String>(16);
-
-        HedwigConnectionImpl connection = (HedwigConnectionImpl) connectionFactory.createConnection();
-        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        Destination destination = session.createTopic(destinationName);
-        MessageConsumer consumer = session.createConsumer(destination);
-
-        consumer.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-                try {
-                    received.add(((TextMessage) message).getText());
-                } catch (JMSException e) {
-                    // should not happen
-                    e.printStackTrace();
-                }
-            }
-        });
-
-        final int MSG_COUNT = connection.getHedwigClientConfig().getMaximumOutstandingMessages() - 1;
-
-        sendMessages(MSG_COUNT);
-        connection.start();
-
-        assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
-            Wait.waitFor(new Wait.Condition(){
-                public boolean isSatisified() throws Exception {
-                    LOG.info("Consumer has received " + received.size() + " messages.");
-                    return received.size() == MSG_COUNT;
-                }
-            }
-        ));
-
-        received.clear();
-        session.rollback();
-
-        {
-            boolean condition = Wait.waitFor(new Wait.Condition() {
-                public boolean isSatisified() throws Exception {
-                    return received.size() > 0;
-                }
-            }, TimeUnit.SECONDS.toMillis(4)
-            );
-            // We do not have any notion of delaying rederlivery - so we immediately get the message (unlike activemq's
-            // connection.getRedeliveryPolicy().setInitialRedeliveryDelay(TimeUnit.SECONDS.toMillis(6));
-            // assertFalse("Delayed redelivery test not expecting any messages yet. got "
-            // + received.size() + " messages", condition);
-            assertTrue("Rollback expects to receive: " + MSG_COUNT + " messages.",
-                Wait.waitFor(new Wait.Condition(){
-                    public boolean isSatisified() throws Exception {
-                        LOG.info("Consumer has received " + received.size() + " messages.");
-                        return received.size() == MSG_COUNT;
-                    }
-                }
-            ));
-        }
-
-        session.commit();
-        session.close();
-    }
-
-    public void testNonBlockingMessageDeleiveryWithRollbacks() throws Exception {
-        final List<String> received = new ArrayList<String>(16);
-
-        HedwigConnectionImpl connection = (HedwigConnectionImpl) connectionFactory.createConnection();
-        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        final Destination destination = session.createTopic(destinationName);
-        final MessageConsumer consumer = session.createConsumer(destination);
-
-        consumer.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-                try {
-                    received.add(((TextMessage) message).getText());
-                } catch (JMSException e) {
-                    // should not happen
-                    e.printStackTrace();
-                }
-            }
-        });
-
-        final int MSG_COUNT = connection.getHedwigClientConfig().getMaximumOutstandingMessages() - 1;
-
-        sendMessages(MSG_COUNT);
-        connection.start();
-
-        assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
-            Wait.waitFor(new Wait.Condition(){
-                public boolean isSatisified() throws Exception {
-                    LOG.info("Consumer has received " + received.size() + " messages.");
-                    return received.size() == MSG_COUNT;
-                }
-            }
-        ));
-
-        received.clear();
-
-        consumer.setMessageListener(new MessageListener() {
-
-            int count = 0;
-
-            @Override
-            public void onMessage(Message message) {
-
-                if (++count > 10) {
-                    try {
-                        session.rollback();
-                        LOG.info("Rolling back session.");
-                        count = 0;
-                    } catch (JMSException e) {
-                        LOG.warn("Caught an unexcepted exception: " + e.getMessage());
-                    }
-                } else {
-                    try {
-                        received.add(((TextMessage) message).getText());
-                    } catch (JMSException e) {
-                        // should not happen
-                        e.printStackTrace();
-                    }
-                    try {
-                        session.commit();
-                    } catch (JMSException e) {
-                        LOG.warn("Caught an unexcepted exception: " + e.getMessage());
-                    }
-                }
-            }
-        });
-
-        session.rollback();
-
-        assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
-            Wait.waitFor(new Wait.Condition(){
-                public boolean isSatisified() throws Exception {
-                    LOG.info("Consumer has received " + received.size() + " messages since rollback.");
-                    return received.size() == MSG_COUNT;
-                }
-            }
-        ));
-
-        assertEquals(MSG_COUNT, received.size());
-        session.commit();
-    }
-
-    private void sendMessages(final int MSG_COUNT) throws Exception {
-        Connection connection = connectionFactory.createConnection();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Destination destination = session.createTopic(destinationName);
-        MessageProducer producer = session.createProducer(destination);
-        for(int i = 0; i < MSG_COUNT; ++i) {
-            producer.send(session.createTextMessage("" + i));
-        }
-    }
-
-    public void setUp() throws Exception {
-        super.setUp();
-        connectionFactory = new HedwigConnectionFactoryImpl();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java
deleted file mode 100644
index fa7b0ea..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Topic;
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.activemq.CombinationTestSupport;
-
-import javax.jms.Destination;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class ObjectMessageNotSerializableTest extends CombinationTestSupport {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ObjectMessageNotSerializableTest.class);
-
-    AtomicInteger numReceived = new AtomicInteger(0);
-    final Vector<Throwable> exceptions = new Vector<Throwable>();
-
-    public static Test suite() {
-        return suite(ObjectMessageNotSerializableTest.class);
-    }
-
-    public static void main(String[] args) {
-        junit.textui.TestRunner.run(suite());
-    }
-
-    protected void setUp() throws Exception {
-        super.setUp();
-        exceptions.clear();
-    }
-
-    public void testSendNotSerializeableObjectMessage() throws Exception {
-
-        final  Destination destination = SessionImpl.asTopic("testT");
-        final MyObject obj = new MyObject("A message");
-
-        final CountDownLatch consumerStarted = new CountDownLatch(1);
-
-        Thread vmConsumerThread = new Thread("Consumer Thread") {
-                public void run() {
-                    try {
-                    HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
-
-                    Connection connection = factory.createConnection();
-                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                    MessageConsumer consumer = session.createConsumer(destination);
-                    connection.start();
-                    consumerStarted.countDown();
-                    ObjectMessage message = (ObjectMessage) consumer.receive(30000);
-                    if ( message != null ) {
-                        MyObject object = (MyObject)message.getObject();
-                        LOG.info("Got message " + object.getMessage());
-                        numReceived.incrementAndGet();
-                    }
-                    consumer.close();
-                    } catch (Throwable ex) {
-                        exceptions.add(ex);
-                    }
-                }
-            };
-        vmConsumerThread.start();
-
-        Thread producingThread = new Thread("Producing Thread") {
-                public void run() {
-                    try {
-                        HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
-
-                        Connection connection = factory.createConnection();
-                        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                        MessageProducer producer = session.createProducer(destination);
-                        ObjectMessage message = session.createObjectMessage();
-                        message.setObject(obj);
-                        producer.send(message);
-                        producer.close();
-                    } catch (Throwable ex) {
-                        exceptions.add(ex);
-                    }
-                }
-            };
-
-        assertTrue("consumers started", consumerStarted.await(10, TimeUnit.SECONDS));
-        producingThread.start();
-
-        vmConsumerThread.join();
-        producingThread.join();
-
-        assert obj.getWriteObjectCalled() > 0 : "writeObject not called";
-        assert 0 == obj.getReadObjectCalled() : "readObject called";
-        assert 0 == obj.getReadObjectNoDataCalled() : "readObjectNoData called ?";
-
-        assertEquals("Got expected messages", 1, numReceived.get());
-        assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
-    }
-
-    public void testSendNotSerializeableObjectMessageOverTcp() throws Exception {
-        final  Destination destination = SessionImpl.asTopic("testTopic");
-        final MyObject obj = new MyObject("A message");
-
-        final CountDownLatch consumerStarted = new CountDownLatch(3);
-        final Vector<Throwable> exceptions = new Vector<Throwable>();
-        Thread vmConsumerThread = new Thread("Consumer Thread") {
-                public void run() {
-                    try {
-                        HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
-
-                        Connection connection = factory.createConnection();
-                        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                        MessageConsumer consumer = session.createConsumer(destination);
-                        connection.start();
-                        consumerStarted.countDown();
-                        ObjectMessage message = (ObjectMessage)consumer.receive(30000);
-                        if ( message != null ) {
-                        MyObject object = (MyObject)message.getObject();
-                        LOG.info("Got message " + object.getMessage());
-                        numReceived.incrementAndGet();
-                    }
-                        consumer.close();
-                    } catch (Throwable ex) {
-                        exceptions.add(ex);
-                    }
-                }
-            };
-        vmConsumerThread.start();
-
-        Thread tcpConsumerThread = new Thread("Consumer Thread") {
-                public void run() {
-                    try {
-
-                    HedwigConnectionFactoryImpl factory =
-                        new HedwigConnectionFactoryImpl();
-
-                    Connection connection = factory.createConnection();
-                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                    MessageConsumer consumer = session.createConsumer(destination);
-                    connection.start();
-                    consumerStarted.countDown();
-                    ObjectMessage message = (ObjectMessage)consumer.receive(30000);
-                    if ( message != null ) {
-                        MyObject object = (MyObject)message.getObject();
-                        LOG.info("Got message " + object.getMessage());
-                        numReceived.incrementAndGet();
-                        assert object.getReadObjectCalled() > 0 : "readObject called";
-                    }
-                    consumer.close();
-                    } catch (Throwable ex) {
-                        exceptions.add(ex);
-                    }
-                }
-            };
-        tcpConsumerThread.start();
-
-
-        Thread notherVmConsumerThread = new Thread("Consumer Thread") {
-            public void run() {
-                try {
-                    HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
-
-                    Connection connection = factory.createConnection();
-                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                    MessageConsumer consumer = session.createConsumer(destination);
-                    connection.start();
-                    consumerStarted.countDown();
-                    ObjectMessage message = (ObjectMessage)consumer.receive(30000);
-                    if ( message != null ) {
-                        MyObject object = (MyObject)message.getObject();
-                        LOG.info("Got message " + object.getMessage());
-                        numReceived.incrementAndGet();
-                    }
-                    consumer.close();
-                } catch (Throwable ex) {
-                    exceptions.add(ex);
-                }
-            }
-        };
-        notherVmConsumerThread.start();
-
-        Thread producingThread = new Thread("Producing Thread") {
-            public void run() {
-                try {
-                    HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
-
-                    Connection connection = factory.createConnection();
-                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                    MessageProducer producer = session.createProducer(destination);
-                    ObjectMessage message = (ObjectMessage)session.createObjectMessage();
-                    message.setObject(obj);
-                    producer.send(message);
-                    producer.close();
-                } catch (Throwable ex) {
-                    exceptions.add(ex);
-                }
-            }
-            };
-
-        assertTrue("consumers started", consumerStarted.await(10, TimeUnit.SECONDS));
-        producingThread.start();
-
-        vmConsumerThread.join();
-        tcpConsumerThread.join();
-        notherVmConsumerThread.join();
-        producingThread.join();
-
-        assertEquals("writeObject called", 1, obj.getWriteObjectCalled());
-        assertEquals("readObject called", 0, obj.getReadObjectCalled());
-        assertEquals("readObjectNoData called", 0, obj.getReadObjectNoDataCalled());
-
-        assertEquals("Got expected messages", 3, numReceived.get());
-        assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ProducerConsumerTestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ProducerConsumerTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ProducerConsumerTestSupport.java
deleted file mode 100644
index 3f570df..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ProducerConsumerTestSupport.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-
-/**
- * Base class for simple test cases using a single connection, session
- * producer and consumer
- */
-public class ProducerConsumerTestSupport extends TestSupport {
-    protected Connection connection;
-    protected Session session;
-    protected MessageProducer producer;
-    protected MessageConsumer consumer;
-    protected Destination destination;
-
-    protected void setUp() throws Exception {
-        super.setUp();
-        connection = createConnection();
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        destination = this.createDestination(getSubject());
-        producer = session.createProducer(destination);
-        consumer = session.createConsumer(destination);
-        connection.start();
-    }
-
-    protected void tearDown() throws Exception {
-        consumer.close();
-        producer.close();
-        session.close();
-        connection.close();
-        super.tearDown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnDurableTopicConsumedMessageTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnDurableTopicConsumedMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnDurableTopicConsumedMessageTest.java
deleted file mode 100644
index 6fd3ef5..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnDurableTopicConsumedMessageTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Topic;
-
-public class PublishOnDurableTopicConsumedMessageTest extends PublishOnTopicConsumedMessageTest {
-
-    protected void setUp() throws Exception {
-        this.durable = true;
-        super.setUp();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumedMessageTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumedMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumedMessageTest.java
deleted file mode 100644
index 31d0219..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumedMessageTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-
-import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;
-import org.apache.hedwig.jms.message.MessageImpl;
-import org.apache.hedwig.jms.message.MessageUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PublishOnTopicConsumedMessageTest extends JmsTopicSendReceiveWithTwoConnectionsTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PublishOnTopicConsumedMessageTest.class);
-
-    private MessageProducer replyProducer;
-
-    public synchronized void onMessage(Message message) {
-
-        // lets resend the message somewhere else
-        try {
-            Message msgCopy = MessageUtil.createMessageCopy(null, message);
-            replyProducer.send(msgCopy);
-
-            // log.info("Sending reply: " + message);
-            super.onMessage(message);
-        } catch (JMSException e) {
-            LOG.info("Failed to send message: " + e);
-            e.printStackTrace();
-        }
-    }
-
-    protected void setUp() throws Exception {
-        super.setUp();
-
-        Destination replyDestination = null;
-
-        if (topic) {
-            replyDestination = receiveSession.createTopic("REPLY." + getSubject());
-        } else {
-            replyDestination = receiveSession.createTopic("REPLY." + getSubject());
-        }
-
-        replyProducer = receiveSession.createProducer(replyDestination);
-        LOG.info("Created replyProducer: " + replyProducer);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/SubscribeClosePublishThenConsumeTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/SubscribeClosePublishThenConsumeTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/SubscribeClosePublishThenConsumeTest.java
deleted file mode 100644
index 0327f66..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/SubscribeClosePublishThenConsumeTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.activemq.test.TestSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SubscribeClosePublishThenConsumeTest extends TestSupport {
-    private static final Logger LOG = LoggerFactory.getLogger(SubscribeClosePublishThenConsumeTest.class);
-
-    public void testDurableTopic() throws Exception {
-        HedwigConnectionFactoryImpl connectionFactory = new HedwigConnectionFactoryImpl();
-
-        String topicName = "TestTopic";
-        String clientID = getName();
-        String subscriberName = "MySubscriber:" + System.currentTimeMillis();
-
-        Connection connection = connectionFactory.createConnection();
-        connection.setClientID(clientID);
-
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic topic = session.createTopic(topicName);
-
-        // this should register a durable subscriber, we then close it to
-        // test that we get messages from the producer later on
-        TopicSubscriber subscriber = session.createDurableSubscriber(topic, subscriberName);
-        connection.start();
-
-        topic = null;
-        subscriber.close();
-        subscriber = null;
-        session.close();
-        session = null;
-
-        // Create the new connection before closing to avoid the broker shutting
-        // down.
-        // now create a new Connection, Session & Producer, send some messages &
-        // then close
-        Connection t = connectionFactory.createConnection();
-        connection.close();
-        connection = t;
-
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        topic = session.createTopic(topicName);
-        MessageProducer producer = session.createProducer(topic);
-        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-        TextMessage textMessage = session.createTextMessage("Hello World");
-        producer.send(textMessage);
-        textMessage = null;
-
-        topic = null;
-        session.close();
-        session = null;
-
-        // Now (re)register the Durable subscriber, setup a listener and wait
-        // for messages that should
-        // have been published by the previous producer
-        t = connectionFactory.createConnection();
-        connection.close();
-        connection = t;
-
-        connection.setClientID(clientID);
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        topic = session.createTopic(topicName);
-
-        subscriber = session.createDurableSubscriber(topic, subscriberName);
-        connection.start();
-
-        LOG.info("Started connection - now about to try receive the textMessage");
-
-        long time = System.currentTimeMillis();
-        Message message = subscriber.receive(15000L);
-        long elapsed = System.currentTimeMillis() - time;
-
-        LOG.info("Waited for: " + elapsed + " millis");
-
-        assertNotNull("Should have received the message we published by now", message);
-        assertTrue("should be text textMessage", message instanceof TextMessage);
-        textMessage = (TextMessage)message;
-        assertEquals("Hello World", textMessage.getText());
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TestSupport.java
deleted file mode 100644
index c2902cf..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TestSupport.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.SessionImpl;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.hedwig.jms.message.MessageImpl;
-
-
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Useful base class for unit test cases
- */
-// Does not contain any testcases.
-@Ignore
-public class TestSupport extends JmsTestBase {
-    private static final Logger LOG = LoggerFactory.getLogger(TestSupport.class);
-
-    protected HedwigConnectionFactoryImpl connectionFactory;
-    protected boolean topic = true;
-
-    public TestSupport() {
-        super();
-    }
-
-    public TestSupport(String name) {
-        super(name);
-    }
-
-    protected MessageImpl createMessage() {
-        return new MessageImpl(null);
-    }
-
-    protected Destination createDestination(String subject) {
-        if (topic) {
-            return SessionImpl.asTopic(subject);
-        } else {
-            return SessionImpl.asTopic(subject);
-        }
-    }
-
-    protected void assertTextMessagesEqual(Message[] firstSet, Message[] secondSet) throws JMSException {
-        assertTextMessagesEqual("", firstSet, secondSet);
-    }
-
-    /**
-     * @param messsage
-     * @param firstSet
-     * @param secondSet
-     */
-    protected void assertTextMessagesEqual(String messsage, Message[] firstSet,
-                                           Message[] secondSet) throws JMSException {
-        assertEquals("Message count does not match: " + messsage, firstSet.length, secondSet.length);
-        for (int i = 0; i < secondSet.length; i++) {
-            TextMessage m1 = (TextMessage)firstSet[i];
-            TextMessage m2 = (TextMessage)secondSet[i];
-            assertTextMessageEqual("Message " + (i + 1) + " did not match : ", m1, m2);
-        }
-    }
-
-    protected void assertEquals(TextMessage m1, TextMessage m2) throws JMSException {
-        assertEquals("", m1, m2);
-    }
-
-    /**
-     * @param message
-     * @param firstSet
-     * @param secondSet
-     */
-    protected void assertTextMessageEqual(String message, TextMessage m1, TextMessage m2) throws JMSException {
-        assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null);
-        if (m1 == null) {
-            return;
-        }
-        assertEquals(message, m1.getText(), m2.getText());
-    }
-
-    protected void assertEquals(Message m1, Message m2) throws JMSException {
-        assertEquals("", m1, m2);
-    }
-
-    /**
-     * @param message
-     * @param firstSet
-     * @param secondSet
-     */
-    protected void assertEquals(String message, Message m1, Message m2) throws JMSException {
-        assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null);
-        if (m1 == null) {
-            return;
-        }
-        assertTrue(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1.getClass() == m2.getClass());
-        if (m1 instanceof TextMessage) {
-            assertTextMessageEqual(message, (TextMessage)m1, (TextMessage)m2);
-        } else {
-            assertEquals(message, m1, m2);
-        }
-    }
-
-    protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception {
-        return new HedwigConnectionFactoryImpl();
-    }
-
-    /**
-     * Factory method to create a new connection
-     */
-    protected Connection createConnection() throws Exception {
-        return getConnectionFactory().createConnection();
-    }
-
-    public HedwigConnectionFactoryImpl getConnectionFactory() throws Exception {
-        if (connectionFactory == null) {
-            connectionFactory = createConnectionFactory();
-            assertTrue("Should have created a connection factory!", connectionFactory != null);
-        }
-        return connectionFactory;
-    }
-
-    protected String getConsumerSubject() {
-        return getSubject();
-    }
-
-    protected String getProducerSubject() {
-        return getSubject();
-    }
-
-    protected String getSubject() {
-        return getClass().getName() + "." + getName();
-    }
-}


Mime
View raw message