Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 39092 invoked from network); 8 Oct 2007 16:17:43 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Oct 2007 16:17:43 -0000 Received: (qmail 83984 invoked by uid 500); 8 Oct 2007 16:15:58 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 83976 invoked by uid 500); 8 Oct 2007 16:15:58 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 83963 invoked by uid 99); 8 Oct 2007 16:15:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Oct 2007 09:15:58 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Oct 2007 16:16:02 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D60B11A9838; Mon, 8 Oct 2007 09:15:11 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r582861 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache: qpid/client/ qpidity/nclient/interop/ qpidity/njms/ Date: Mon, 08 Oct 2007 16:15:08 -0000 To: qpid-commits@incubator.apache.org From: rajith@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071008161511.D60B11A9838@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajith Date: Mon Oct 8 09:15:07 2007 New Revision: 582861 URL: http://svn.apache.org/viewvc?rev=582861&view=rev Log: Changed to use Window for asyn and credit mode for sync consume. Also added logic to change the mode when suspend is called and when the message listener is set to null. Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=582861&r1=582860&r2=582861&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Mon Oct 8 09:15:07 2007 @@ -344,7 +344,7 @@ new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); - getQpidSession().messageFlowMode(tag.toString(), Session.MESSAGE_FLOW_MODE_WINDOW); + // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); @@ -437,17 +437,30 @@ for (BasicMessageConsumer consumer : _consumers.values()) { getQpidSession().messageStop(consumer.getConsumerTag().toString()); + getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); + } } else { for (BasicMessageConsumer consumer : _consumers.values()) { - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, - MAX_PREFETCH); - // todo this - getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + //only set if msg list is null + try + { + if (consumer.getMessageListener() != null) + { + getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, + MAX_PREFETCH); + // todo this + getQpidSession() + .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + } + } + catch(Exception e) + { + throw new AMQException(AMQConstant.INTERNAL_ERROR,"Error while trying to get the listener",e); + } } } // We need to sync so that we get notify of an error. Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=582861&r1=582860&r2=582861&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Mon Oct 8 09:15:07 2007 @@ -82,7 +82,7 @@ * Used in the blocking receive methods to receive a message from the Session thread.

Or to notify of errors *

Argument true indicates we want strict FIFO semantics */ - private final ArrayBlockingQueue _synchronousQueue; + protected final ArrayBlockingQueue _synchronousQueue; protected MessageFactoryRegistry _messageFactory; @@ -354,15 +354,7 @@ return null; } - Object o ; - if (l > 0) - { - o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); - } - else - { - o = _synchronousQueue.take(); - } + Object o = getMessageFromQueue(l); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) @@ -384,6 +376,8 @@ releaseReceiving(); } } + + public abstract Object getMessageFromQueue(long l) throws InterruptedException; private boolean closeOnAutoClose() throws JMSException { Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=582861&r1=582860&r2=582861&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Mon Oct 8 09:15:07 2007 @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -29,6 +29,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpidity.api.Message; +import org.apache.qpidity.nclient.Session; import org.apache.qpidity.transport.*; import org.apache.qpidity.QpidException; import org.apache.qpidity.filter.MessageFilter; @@ -38,6 +39,7 @@ import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; /** * This is a 0.10 message consumer. @@ -65,7 +67,7 @@ */ private boolean _preAcquire = true; - //--- constructor + //--- constructor protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, @@ -127,7 +129,7 @@ public void onMessage(Message message) - { + { int channelId = getSession().getChannelId(); long deliveryId = message.getMessageTransferId(); String consumerTag = getConsumerTag().toString(); @@ -215,7 +217,7 @@ private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException { boolean messageOk = true; - // TODO Use a tag for fiding out if message filtering is done here or by the broker. + // TODO Use a tag for fiding out if message filtering is done here or by the broker. try { if (getMessageSelector() != null) @@ -334,15 +336,45 @@ public void setMessageListener(final MessageListener messageListener) throws JMSException { super.setMessageListener(messageListener); - if (_connection.started()) + if (messageListener == null) + { + _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); + _0_10session.getQpidSession().messageStop(getConsumerTag().toString()); + _0_10session.getQpidSession().sync(); + } + else + { + if (_connection.started()) + { + _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, + AMQSession_0_10.MAX_PREFETCH); + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, + 0xFFFFFFFF); + _0_10session.getQpidSession().sync(); + } + } + } + + public Object getMessageFromQueue(long l) throws InterruptedException + { + Object o; + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); + + if (l > 0) { - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, - AMQSession_0_10.MAX_PREFETCH); - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, - 0xFFFFFFFF); + o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); + _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); _0_10session.getQpidSession().sync(); + o = _synchronousQueue.poll(); + } + else + { + o = _synchronousQueue.take(); } + return null; } } Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=582861&r1=582860&r2=582861&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Mon Oct 8 09:15:07 2007 @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import java.util.concurrent.TimeUnit; + import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -84,4 +86,18 @@ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); } + + public Object getMessageFromQueue(long l) throws InterruptedException + { + Object o; + if (l > 0) + { + o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); + } + else + { + o = _synchronousQueue.take(); + } + return null; + } } Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java?rev=582861&r1=582860&r2=582861&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java Mon Oct 8 09:15:07 2007 @@ -147,6 +147,5 @@ t.testSendMessage(); t.testMessageFlush(); t.close(); - } } Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java?rev=582861&r1=582860&r2=582861&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java Mon Oct 8 09:15:07 2007 @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,6 +28,7 @@ import org.apache.qpidity.QpidException; import org.apache.qpidity.nclient.MessagePartListener; +import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; import org.apache.qpidity.exchange.ExchangeDefaults; import org.apache.qpidity.filter.JMSSelectorFilter; @@ -159,7 +160,7 @@ // bind this queue with the topic exchange getSession().getQpidSession() .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getRoutingKey(), null); - // subscribe to this topic + // subscribe to this topic getSession().getQpidSession() .messageSubscribe(queueName, getMessageActorID(), org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, @@ -174,12 +175,21 @@ // set the flow mode getSession().getQpidSession() .messageFlowMode(getMessageActorID(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_MODE_CREDIT); + // Set unlimited byte credits + getSession().getQpidSession().messageFlow(getMessageActorID(), Session.MESSAGE_FLOW_UNIT_BYTE, -1); // this will prevent the broker from sending more than one message // When a messageListener is set the flow will be adjusted. // until then we assume it's for synchronous message consumption requestCredit(1); - requestSync(); + try + { + requestSync(); + } + catch(Exception e) + { + e.printStackTrace(); + } // check for an exception if (getSession().getCurrentException() != null) {