Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9755E10239 for ; Mon, 13 Jan 2014 20:36:09 +0000 (UTC) Received: (qmail 9638 invoked by uid 500); 13 Jan 2014 19:05:23 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 9604 invoked by uid 500); 13 Jan 2014 19:05:21 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 9480 invoked by uid 99); 13 Jan 2014 19:05:09 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Jan 2014 19:05:09 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 59C058316AD; Mon, 13 Jan 2014 19:05:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: <5a6a315d26fb4bc5a884888abe7059c2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: https://issues.apache.org/jira/browse/AMQ-4968 Date: Mon, 13 Jan 2014 19:05:08 +0000 (UTC) Updated Branches: refs/heads/trunk 6377d49a1 -> 2a7c34997 https://issues.apache.org/jira/browse/AMQ-4968 Add non-caching mode for Session producers. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2a7c3499 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2a7c3499 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2a7c3499 Branch: refs/heads/trunk Commit: 2a7c3499766c961c6996429af82c3ee374d24a33 Parents: 6377d49 Author: Timothy Bish Authored: Mon Jan 13 14:05:04 2014 -0500 Committer: Timothy Bish Committed: Mon Jan 13 14:05:04 2014 -0500 ---------------------------------------------------------------------- .../activemq/jms/pool/ConnectionPool.java | 13 +- .../jms/pool/PooledConnectionFactory.java | 28 ++++ .../activemq/jms/pool/PooledProducer.java | 24 +++- .../apache/activemq/jms/pool/PooledSession.java | 136 +++++++++++++------ .../PooledSessionNoPublisherCachingTest.java | 129 ++++++++++++++++++ .../activemq/jms/pool/PooledSessionTest.java | 71 ++++++++++ 6 files changed, 358 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7c3499/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java index df2da17..08d2038 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java @@ -22,9 +22,9 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; +import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Session; -import javax.jms.IllegalStateException; import org.apache.commons.pool.KeyedPoolableObjectFactory; import org.apache.commons.pool.impl.GenericKeyedObjectPool; @@ -51,6 +51,7 @@ public class ConnectionPool { private boolean hasExpired; private int idleTimeout = 30 * 1000; private long expiryTimeout = 0l; + private boolean useAnonymousProducers = true; private final AtomicBoolean started = new AtomicBoolean(false); private final GenericKeyedObjectPool sessionPool; @@ -78,7 +79,7 @@ public class ConnectionPool { @Override public PooledSession makeObject(SessionKey key) throws Exception { Session session = makeSession(key); - return new PooledSession(key, session, sessionPool, key.isTransacted()); + return new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers); } @Override @@ -248,6 +249,14 @@ public class ConnectionPool { this.sessionPool.setMaxActive(maximumActiveSessionPerConnection); } + public boolean isUseAnonymousProducers() { + return this.useAnonymousProducers; + } + + public void setUseAnonymousProducers(boolean value) { + this.useAnonymousProducers = value; + } + /** * @return the total number of Pooled session including idle sessions that are not * currently loaned out to any client. http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7c3499/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java index 64eaad2..9ac853d 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java @@ -70,6 +70,7 @@ public class PooledConnectionFactory implements ConnectionFactory { private boolean blockIfSessionPoolIsFull = true; private long expiryTimeout = 0l; private boolean createConnectionOnStartup = true; + private boolean useAnonymousProducers = true; public void initConnectionsPool() { if (this.connectionsPool == null) { @@ -101,6 +102,7 @@ public class PooledConnectionFactory implements ConnectionFactory { connection.setExpiryTimeout(getExpiryTimeout()); connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection()); connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull()); + connection.setUseAnonymousProducers(isUseAnonymousProducers()); if (LOG.isTraceEnabled()) { LOG.trace("Created new connection: {}", connection); @@ -427,6 +429,32 @@ public class PooledConnectionFactory implements ConnectionFactory { } /** + * Should Sessions use one anonymous producer for all producer requests or should a new + * MessageProducer be created for each request to create a producer object, default is true. + * + * When enabled the session only needs to allocate one MessageProducer for all requests and + * the MessageProducer#send(destination, message) method can be used. Normally this is the + * right thing to do however it does result in the Broker not showing the producers per + * destination. + * + * @return true if a PooledSession will use only a single anonymous message producer instance. + */ + public boolean isUseAnonymousProducers() { + return this.useAnonymousProducers; + } + + /** + * Sets whether a PooledSession uses only one anonymous MessageProducer instance or creates + * a new MessageProducer for each call the create a MessageProducer. + * + * @param value + * Boolean value that configures whether anonymous producers are used. + */ + public void setUseAnonymousProducers(boolean value) { + this.useAnonymousProducers = value; + } + + /** * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys. * * @return this factories pool of ConnectionPool instances. http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7c3499/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java index 817a1f1..7f54b99 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java @@ -17,6 +17,7 @@ package org.apache.activemq.jms.pool; import javax.jms.Destination; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; @@ -34,10 +35,12 @@ public class PooledProducer implements MessageProducer { private boolean disableMessageTimestamp; private int priority; private long timeToLive; + private boolean anonymous = true; public PooledProducer(MessageProducer messageProducer, Destination destination) throws JMSException { this.messageProducer = messageProducer; this.destination = destination; + this.anonymous = messageProducer.getDestination() == null; this.deliveryMode = messageProducer.getDeliveryMode(); this.disableMessageID = messageProducer.getDisableMessageID(); @@ -48,6 +51,9 @@ public class PooledProducer implements MessageProducer { @Override public void close() throws JMSException { + if (!anonymous) { + this.messageProducer.close(); + } } @Override @@ -67,13 +73,25 @@ public class PooledProducer implements MessageProducer { @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + if (destination == null) { - destination = this.destination; + if (messageProducer.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + throw new InvalidDestinationException("Don't understand null destinations"); } + MessageProducer messageProducer = getMessageProducer(); // just in case let only one thread send at once synchronized (messageProducer) { + + if (anonymous && !this.destination.equals(destination)) { + throw new UnsupportedOperationException("This producer can only send messages to: " + this.destination); + } + + // Producer will do it's own Destination validation so always use the destination + // based send method otherwise we might violate a JMS rule. messageProducer.send(destination, message, deliveryMode, priority, timeToLive); } } @@ -139,6 +157,10 @@ public class PooledProducer implements MessageProducer { return messageProducer; } + protected boolean isAnonymous() { + return anonymous; + } + @Override public String toString() { return "PooledProducer { " + messageProducer + " }"; http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7c3499/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java index d0e4a09..1d3fc2f 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java @@ -57,22 +57,24 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes private final KeyedObjectPool sessionPool; private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList(); - private final CopyOnWriteArrayList sessionEventListeners = - new CopyOnWriteArrayList(); + private final CopyOnWriteArrayList sessionEventListeners = new CopyOnWriteArrayList(); + + private MessageProducer producer; + private TopicPublisher publisher; + private QueueSender sender; private Session session; - private MessageProducer messageProducer; - private QueueSender queueSender; - private TopicPublisher topicPublisher; private boolean transactional = true; private boolean ignoreClose; private boolean isXa; + private boolean useAnonymousProducers = true; - public PooledSession(SessionKey key, Session session, KeyedObjectPool sessionPool, boolean transactional) { + public PooledSession(SessionKey key, Session session, KeyedObjectPool sessionPool, boolean transactional, boolean anonymous) { this.key = key; this.session = session; this.sessionPool = sessionPool; this.transactional = transactional; + this.useAnonymousProducers = anonymous; } public void addSessionEventListener(PooledSessionEventListener listener) { @@ -268,7 +270,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes @Override public XAResource getXAResource() { if (session instanceof XASession) { - return ((XASession)session).getXAResource(); + return ((XASession) session).getXAResource(); } return null; } @@ -334,53 +336,39 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes @Override public TopicSubscriber createSubscriber(Topic topic) throws JMSException { - return addTopicSubscriber(((TopicSession)getInternalSession()).createSubscriber(topic)); + return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic)); } @Override public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException { - return addTopicSubscriber(((TopicSession)getInternalSession()).createSubscriber(topic, selector, local)); + return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local)); } @Override public QueueReceiver createReceiver(Queue queue) throws JMSException { - return addQueueReceiver(((QueueSession)getInternalSession()).createReceiver(queue)); + return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue)); } @Override public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException { - return addQueueReceiver(((QueueSession)getInternalSession()).createReceiver(queue, selector)); + return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector)); } // Producer related methods // ------------------------------------------------------------------------- @Override public MessageProducer createProducer(Destination destination) throws JMSException { - return new PooledProducer(getMessageProducer(), destination); + return new PooledProducer(getMessageProducer(destination), destination); } @Override public QueueSender createSender(Queue queue) throws JMSException { - return new PooledQueueSender(getQueueSender(), queue); + return new PooledQueueSender(getQueueSender(queue), queue); } @Override public TopicPublisher createPublisher(Topic topic) throws JMSException { - return new PooledTopicPublisher(getTopicPublisher(), topic); - } - - /** - * Callback invoked when the consumer is closed. - *

- * This is used to keep track of an explicit closed consumer created by this - * session, by which we know do not need to keep track of the consumer, as - * its already closed. - * - * @param consumer - * the consumer which is being closed - */ - protected void onConsumerClose(MessageConsumer consumer) { - consumers.remove(consumer); + return new PooledTopicPublisher(getTopicPublisher(topic), topic); } public Session getInternalSession() throws IllegalStateException { @@ -391,24 +379,78 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes } public MessageProducer getMessageProducer() throws JMSException { - if (messageProducer == null) { - messageProducer = getInternalSession().createProducer(null); + return getMessageProducer(null); + } + + public MessageProducer getMessageProducer(Destination destination) throws JMSException { + MessageProducer result = null; + + if (useAnonymousProducers) { + if (producer == null) { + // Don't allow for duplicate anonymous producers. + synchronized (this) { + if (producer == null) { + producer = getInternalSession().createProducer(null); + } + } + } + + result = producer; + } else { + result = getInternalSession().createProducer(destination); } - return messageProducer; + + return result; } public QueueSender getQueueSender() throws JMSException { - if (queueSender == null) { - queueSender = ((QueueSession)getInternalSession()).createSender(null); + return getQueueSender(null); + } + + public QueueSender getQueueSender(Queue destination) throws JMSException { + QueueSender result = null; + + if (useAnonymousProducers) { + if (sender == null) { + // Don't allow for duplicate anonymous producers. + synchronized (this) { + if (sender == null) { + sender = ((QueueSession) getInternalSession()).createSender(null); + } + } + } + + result = sender; + } else { + result = ((QueueSession) getInternalSession()).createSender(destination); } - return queueSender; + + return result; } public TopicPublisher getTopicPublisher() throws JMSException { - if (topicPublisher == null) { - topicPublisher = ((TopicSession)getInternalSession()).createPublisher(null); + return getTopicPublisher(null); + } + + public TopicPublisher getTopicPublisher(Topic destination) throws JMSException { + TopicPublisher result = null; + + if (useAnonymousProducers) { + if (publisher == null) { + // Don't allow for duplicate anonymous producers. + synchronized (this) { + if (publisher == null) { + publisher = ((TopicSession) getInternalSession()).createPublisher(null); + } + } + } + + result = publisher; + } else { + result = ((TopicSession) getInternalSession()).createPublisher(destination); } - return topicPublisher; + + return result; } private QueueBrowser addQueueBrowser(QueueBrowser browser) { @@ -418,9 +460,9 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes private MessageConsumer addConsumer(MessageConsumer consumer) { consumers.add(consumer); - // must wrap in PooledMessageConsumer to ensure the onConsumerClose method is - // invoked when the returned consumer is closed, to avoid memory leak in this - // session class in case many consumers is created + // must wrap in PooledMessageConsumer to ensure the onConsumerClose + // method is invoked when the returned consumer is closed, to avoid memory + // leak in this session class in case many consumers is created return new PooledMessageConsumer(this, consumer); } @@ -442,4 +484,18 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes public String toString() { return "PooledSession { " + session + " }"; } + + /** + * Callback invoked when the consumer is closed. + *

+ * This is used to keep track of an explicit closed consumer created by this + * session, by which we know do not need to keep track of the consumer, as + * its already closed. + * + * @param consumer + * the consumer which is being closed + */ + protected void onConsumerClose(MessageConsumer consumer) { + consumers.remove(consumer); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7c3499/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionNoPublisherCachingTest.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionNoPublisherCachingTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionNoPublisherCachingTest.java new file mode 100644 index 0000000..6671376 --- /dev/null +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionNoPublisherCachingTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.jms.pool; + +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.fail; + +import javax.jms.Queue; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSession; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class PooledSessionNoPublisherCachingTest { + + private BrokerService broker; + private ActiveMQConnectionFactory factory; + private PooledConnectionFactory pooledFactory; + private String connectionUri; + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + TransportConnector connector = broker.addConnector("tcp://localhost:0"); + broker.start(); + connectionUri = connector.getPublishableConnectString(); + factory = new ActiveMQConnectionFactory(connectionUri); + pooledFactory = new PooledConnectionFactory(); + pooledFactory.setConnectionFactory(factory); + pooledFactory.setMaxConnections(1); + pooledFactory.setBlockIfSessionPoolIsFull(false); + pooledFactory.setUseAnonymousProducers(false); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + + @Test + public void testMessageProducersAreUnique() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session.createTemporaryQueue(); + Queue queue2 = session.createTemporaryQueue(); + + PooledProducer producer1 = (PooledProducer) session.createProducer(queue1); + PooledProducer producer2 = (PooledProducer) session.createProducer(queue2); + + assertNotSame(producer1.getMessageProducer(), producer2.getMessageProducer()); + } + + @Test + public void testThrowsWhenDestinationGiven() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session.createTemporaryQueue(); + Queue queue2 = session.createTemporaryQueue(); + + PooledProducer producer = (PooledProducer) session.createProducer(queue1); + + try { + producer.send(queue2, session.createTextMessage()); + fail("Should only be able to send to queue 1"); + } catch (Exception ex) { + } + + try { + producer.send(null, session.createTextMessage()); + fail("Should only be able to send to queue 1"); + } catch (Exception ex) { + } + } + + @Test + public void testCreateTopicPublisher() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic1 = session.createTopic("Topic-1"); + Topic topic2 = session.createTopic("Topic-2"); + + PooledTopicPublisher publisher1 = (PooledTopicPublisher) session.createPublisher(topic1); + PooledTopicPublisher publisher2 = (PooledTopicPublisher) session.createPublisher(topic2); + + assertNotSame(publisher1.getMessageProducer(), publisher2.getMessageProducer()); + } + + @Test + public void testQueueSender() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session.createTemporaryQueue(); + Queue queue2 = session.createTemporaryQueue(); + + PooledQueueSender sender1 = (PooledQueueSender) session.createSender(queue1); + PooledQueueSender sender2 = (PooledQueueSender) session.createSender(queue2); + + assertNotSame(sender1.getMessageProducer(), sender2.getMessageProducer()); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7c3499/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java index a60d053..7483e6b 100644 --- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java @@ -17,8 +17,14 @@ package org.apache.activemq.jms.pool; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; +import javax.jms.Queue; +import javax.jms.QueueSession; import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; @@ -68,4 +74,69 @@ public class PooledSessionTest { assertEquals(1, connection.getNumtIdleSessions()); assertEquals(1, connection.getNumSessions()); } + + @Test + public void testMessageProducersAreAllTheSame() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session.createTemporaryQueue(); + Queue queue2 = session.createTemporaryQueue(); + + PooledProducer producer1 = (PooledProducer) session.createProducer(queue1); + PooledProducer producer2 = (PooledProducer) session.createProducer(queue2); + + assertSame(producer1.getMessageProducer(), producer2.getMessageProducer()); + } + + @Test + public void testThrowsWhenDifferentDestinationGiven() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session.createTemporaryQueue(); + Queue queue2 = session.createTemporaryQueue(); + + PooledProducer producer = (PooledProducer) session.createProducer(queue1); + + try { + producer.send(queue2, session.createTextMessage()); + fail("Should only be able to send to queue 1"); + } catch (Exception ex) { + } + + try { + producer.send(null, session.createTextMessage()); + fail("Should only be able to send to queue 1"); + } catch (Exception ex) { + } + } + + @Test + public void testCreateTopicPublisher() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic1 = session.createTopic("Topic-1"); + Topic topic2 = session.createTopic("Topic-2"); + + PooledTopicPublisher publisher1 = (PooledTopicPublisher) session.createPublisher(topic1); + PooledTopicPublisher publisher2 = (PooledTopicPublisher) session.createPublisher(topic2); + + assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer()); + } + + @Test + public void testQueueSender() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session.createTemporaryQueue(); + Queue queue2 = session.createTemporaryQueue(); + + PooledQueueSender sender1 = (PooledQueueSender) session.createSender(queue1); + PooledQueueSender sender2 = (PooledQueueSender) session.createSender(queue2); + + assertSame(sender1.getMessageProducer(), sender2.getMessageProducer()); + } }