import java.util.*;
import java.util.concurrent.*;
import javax.jms.*;
import javax.naming.*;

class QpidConsumerCloseRollbackDeadlock {

	private transient Connection connection;
	transient Session session;
	private transient MessageProducer emptyProducer;
	// The URL Used to connect tot he broker.
	private static String connUrl = "amqp://guest:guest@test/?brokerlist='tcp://localhost:5672?tcp_nodelay='true''&max_prefetch='1'";

	final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";

	final String CONNECTION_JNDI_NAME = "local";

	private InitialContext _ctx;
	Map<String, Destination> queueNameToDestination = new HashMap<String, Destination>();

	// the options used when creating a new queue
	private static String options = ";{create: always , node : {type : queue, durable : true}}";

	public static void main(String[] args) {
		String queueName = "TestingQ";
		QpidConsumerCloseRollbackDeadlock test = null;
		MessageConsumer consumer = null;
		QpidConsumerCloseRollbackDeadlock.QpidMqHandler handler = null;
		try {
			test = new QpidConsumerCloseRollbackDeadlock();

			// open connection, transacted session to the broker
			test.open();
			CountDownLatch latch = new CountDownLatch(1);

			// create The queues in the broker
			test.createQueue(queueName);

			// register the handlers for the queue.
			handler = new QpidMqHandler(test.session, latch);
			consumer = test.listen(queueName, handler);

		} catch (Throwable e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		}

		try {
			// enqueue 1 message and commit that message
			test.enqueue(queueName, "ABCDEF");
			test.commitSession();
		} catch (Throwable e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		}

		try {
			test.waitOnHandler(handler);
			consumer.close();
			test.close();
		} catch (Exception e) {

		}
	}

	/**
	 * Open a new connection to the broker and start it.
	 */
	public void open() throws Exception {
		// Set the properties ...
		Properties properties = new Properties();
		properties
				.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
		properties.put("connectionfactory." + CONNECTION_JNDI_NAME, connUrl);

		try {
			_ctx = new InitialContext(properties);
		} catch (NamingException e) {
			System.err.println("Error Setting up JNDI Context:" + e);
		}
		connection = ((ConnectionFactory) _ctx.lookup(CONNECTION_JNDI_NAME))
				.createConnection();

		// create transacted session
		session = connection.createSession(true, Session.SESSION_TRANSACTED);
		emptyProducer = session.createProducer(null);
		emptyProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
		connection.start();
	}

	/**
	 * Close the connection to the broker
	 */
	public void close() throws Exception {
		try {
			if (session != null) {
				session.close();
			}
		} finally {
			try {
				if (connection != null) {
					connection.close();
				}
			} finally {
				if (_ctx != null) {
					_ctx.close();
				}
			}
		}

	}

	/**
	 * Creates a new queue and adds it to the destination to queue map.
	 * 
	 */
	public String createQueue(String queueName) throws Exception {
		Destination destination = session.createQueue(queueName + options);
		if (destination != null) {
			queueNameToDestination.put(queueName, destination);
			return queueName;
		} else {
			System.out.println("Queue Created Null");
			return null;
		}
	}

	/**
	 * Create a listener for the queue and returns the message consumer
	 * 
	 * @return
	 */
	public MessageConsumer listen(String p2pConsumer, QpidMqHandler handler)
			throws Exception {
		Destination destination = queueNameToDestination.get(p2pConsumer);
		MessageConsumer consumer = session.createConsumer(destination);
		consumer.setMessageListener(handler);
		return consumer;
	}

	/**
	 * Wait for consumers to complete.
	 */
	private void waitOnHandler(QpidMqHandler handler) throws Exception {

		boolean handlerCompleted = handler.latch.await(60, TimeUnit.SECONDS);
		if (!handlerCompleted) {
			System.out.println("The test failed to complete");
		}

	}

	/**
	 * Enqueue Messages
	 */
	public String enqueue(String p2pConsumer, String payload) throws Exception {
		MapMessage message = session.createMapMessage();
		message.setString("Body", payload);
		Destination destination = queueNameToDestination.get(p2pConsumer);
		emptyProducer.send(destination, message);
		return message.getJMSMessageID();
	}

	/**
	 * Commit the session
	 */
	public void commitSession() throws Exception {
		session.commit();
	}

	/**
	 * The callback handler
	 */
	static public class QpidMqHandler implements MessageListener {

		final CountDownLatch latch;
		final Session session;

		public QpidMqHandler(Session session, CountDownLatch suppliedLatch) {
			this.session = session;
			this.latch = suppliedLatch;
		}

		@Override
		final public void onMessage(Message arg0) {
			try {
				// countdown the latch and rollback to simulate a failure.
				latch.countDown();
				session.rollback();
			} catch (Exception x) {
				throw new RuntimeException(x);
			}
		}

	}

}
