Return-Path: Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: (qmail 90701 invoked from network); 20 Jul 2009 19:04:30 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 20 Jul 2009 19:04:30 -0000 Received: (qmail 70522 invoked by uid 500); 20 Jul 2009 19:05:35 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 70485 invoked by uid 500); 20 Jul 2009 19:05:35 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 70476 invoked by uid 99); 20 Jul 2009 19:05:35 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jul 2009 19:05:35 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jul 2009 19:05:31 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 532CA23888E2; Mon, 20 Jul 2009 19:05:11 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r795958 [2/3] - in /qpid/branches/java-broker-0-10/qpid/java: broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/src/main/java/org/apache/q... Date: Mon, 20 Jul 2009 19:05:08 -0000 To: commits@qpid.apache.org From: rgodfrey@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090720190511.532CA23888E2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java Mon Jul 20 19:05:05 2009 @@ -36,7 +36,7 @@ this.xpath = xpath; } - public Object evaluate(Filterable message) throws AMQException { + public Object evaluate(Filterable message) { return Boolean.FALSE; } @@ -49,7 +49,7 @@ * @return true if the expression evaluates to Boolean.TRUE. * @throws AMQException */ - public boolean matches(Filterable message) throws AMQException + public boolean matches(Filterable message) { Object object = evaluate(message); return object!=null && object==Boolean.TRUE; Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java Mon Jul 20 19:05:05 2009 @@ -43,7 +43,7 @@ this.xpath = xpath; } - public boolean evaluate(Filterable m) throws AMQException + public boolean evaluate(Filterable m) { // TODO - we would have to check the content type and then evaluate the content // here... is this really a feature we wish to implement? - RobG Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java Mon Jul 20 19:05:05 2009 @@ -1,11 +1,8 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.Set; -import java.util.HashSet; /* * @@ -52,7 +49,7 @@ return _bytesCredit.get() > 0L; } - public boolean useCreditForMessage(AMQMessage msg) + public boolean useCreditForMessage(ServerMessage msg) { final long msgSize = msg.getSize(); if(hasCredit()) Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java Mon Jul 20 19:05:05 2009 @@ -1,6 +1,7 @@ package org.apache.qpid.server.flow; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; /* * @@ -40,5 +41,5 @@ public boolean hasCredit(); - public boolean useCreditForMessage(AMQMessage msg); + public boolean useCreditForMessage(ServerMessage msg); } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java Mon Jul 20 19:05:05 2009 @@ -1,6 +1,6 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; /* * @@ -37,7 +37,7 @@ return true; } - public boolean useCreditForMessage(AMQMessage msg) + public boolean useCreditForMessage(ServerMessage msg) { return true; } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java Mon Jul 20 19:05:05 2009 @@ -1,8 +1,6 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; - -import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.server.message.ServerMessage; /* * @@ -54,7 +52,7 @@ return (_messageCredit > 0L) && ( _bytesCredit > 0L ); } - public synchronized boolean useCreditForMessage(AMQMessage msg) + public synchronized boolean useCreditForMessage(ServerMessage msg) { if(_messageCredit == 0L) { Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java Mon Jul 20 19:05:05 2009 @@ -1,6 +1,6 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicLong; @@ -50,7 +50,7 @@ return _messageCredit.get() > 0L; } - public boolean useCreditForMessage(AMQMessage msg) + public boolean useCreditForMessage(ServerMessage msg) { if(hasCredit()) { Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java Mon Jul 20 19:05:05 2009 @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager { @@ -123,7 +123,7 @@ && (_messageCreditLimit == 0L || _messageCredit > 0); } - public synchronized boolean useCreditForMessage(final AMQMessage msg) + public synchronized boolean useCreditForMessage(final ServerMessage msg) { if(_messageCreditLimit != 0L) { Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Mon Jul 20 19:05:05 2009 @@ -41,6 +41,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; @@ -130,8 +131,16 @@ throws AMQException { singleMessageCredit.useCreditForMessage(entry.getMessage()); - session.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(), - deliveryTag, queue.getMessageCount()); + if(entry.getMessage() instanceof AMQMessage) + { + session.getProtocolOutputConverter().writeGetOk((AMQMessage)(entry.getMessage()), channel.getChannelId(), + deliveryTag, queue.getMessageCount()); + } + else + { + //TODO Convert AMQP 0-10 message + throw new RuntimeException("Not implemented conversion of 0-10 message"); + } } }; Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Mon Jul 20 19:05:05 2009 @@ -87,7 +87,7 @@ return; } - if (!message.getMessage().isReferenced()) + if (message.getMessage() == null) { _logger.warn("Message as already been purged, unable to Reject."); return; @@ -96,7 +96,7 @@ if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() + + _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + ": Requeue:" + body.getRequeue() + //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Mon Jul 20 19:05:05 2009 @@ -26,6 +26,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; @@ -61,7 +62,7 @@ { throw body.getChannelNotFoundException(channelId); } - + StoreContext.setCurrentContext(channel.getStoreContext()); channel.commit(); MethodRegistry methodRegistry = session.getMethodRegistry(); @@ -74,5 +75,9 @@ { throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); } + finally + { + StoreContext.clearCurrentContext(); + } } } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Mon Jul 20 19:05:05 2009 @@ -36,8 +36,6 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.AMQException; -import org.apache.mina.common.ByteBuffer; - import java.util.Iterator; public class ProtocolOutputConverterImpl implements ProtocolOutputConverter Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Mon Jul 20 19:05:05 2009 @@ -1,25 +1,25 @@ package org.apache.qpid.server.output.amqp0_9; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + import org.apache.mina.common.ByteBuffer; Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon Jul 20 19:05:05 2009 @@ -33,6 +33,7 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.message.*; import java.util.Iterator; @@ -41,12 +42,12 @@ /** * A deliverable message. */ -public class AMQMessage implements Filterable +public class AMQMessage implements Filterable, ServerMessage { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); - private final AtomicInteger _referenceCount = new AtomicInteger(1); + private final AtomicInteger _referenceCount = new AtomicInteger(0); private final AMQMessageHandle _messageHandle; @@ -72,7 +73,7 @@ private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier; private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); - + private final AMQMessageHeader _messageHeader; /** @@ -202,6 +203,7 @@ _messageHandle = factory.createMessageHandle(messageId, store, true); _storeContext = txnConext.getStoreContext(); _size = _messageHandle.getBodySize(txnConext.getStoreContext()); + _messageHeader = new ContentHeaderBodyAdapter(_messageHandle.getContentHeaderBody(txnConext.getStoreContext())); } /** @@ -221,6 +223,7 @@ { _messageHandle = messageHandle; _storeContext = storeConext; + _messageHeader = new ContentHeaderBodyAdapter(_messageHandle.getContentHeaderBody(storeConext)); if(info.isImmediate()) { @@ -234,6 +237,7 @@ protected AMQMessage(AMQMessage msg) throws AMQException { _messageHandle = msg._messageHandle; + _messageHeader = msg._messageHeader; _storeContext = msg._storeContext; _flags = msg._flags; _size = msg._size; @@ -315,12 +319,11 @@ * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the * message store. * - * @param storeContext * * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed */ - public void decrementReference(StoreContext storeContext) throws MessageCleanupException + public void decrementReference() throws MessageCleanupException { int count = _referenceCount.decrementAndGet(); @@ -342,13 +345,12 @@ // and the handle has not yet been constructed if (_messageHandle != null) { - _messageHandle.removeMessage(storeContext); + _messageHandle.removeMessage(StoreContext.getCurrentContext()); } } catch (AMQException e) { - // to maintain consistency, we revert the count - incrementReference(); + throw new MessageCleanupException(getMessageId(), e); } } @@ -373,7 +375,18 @@ return (_flags & DELIVERED_TO_CONSUMER) != 0; } - public boolean isPersistent() throws AMQException + public String getRoutingKey() + { + // TODO + return null; + } + + public AMQMessageHeader getMessageHeader() + { + return _messageHeader; + } + + public boolean isPersistent() { return _messageHandle.isPersistent(); } @@ -455,6 +468,26 @@ } + public boolean isImmediate() + { + return (_flags & IMMEDIATE) == IMMEDIATE; + } + + public long getExpiration() + { + return _expiration; + } + + public MessageReference newReference() + { + return new AMQMessageReference(this); + } + + public Long getMessageNumber() + { + return getMessageId(); + } + public Object getPublisherClientInstance() { //todo store sessionIdentifier/client id with message in store Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Jul 20 19:05:05 2009 @@ -20,15 +20,13 @@ */ package org.apache.qpid.server.queue; -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -88,7 +86,7 @@ int delete() throws AMQException; - QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; + QueueEntry enqueue(ServerMessage message) throws AMQException; void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Mon Jul 20 19:05:05 2009 @@ -36,6 +36,7 @@ import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.message.ServerMessage; import javax.management.JMException; import javax.management.MBeanException; @@ -246,7 +247,7 @@ /** * Checks if there is any notification to be send to the listeners */ - public void checkForNotification(AMQMessage msg) throws AMQException + public void checkForNotification(ServerMessage msg) throws AMQException { final Set notificationChecks = _queue.getNotificationChecks(); @@ -333,48 +334,60 @@ throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); } - AMQMessage msg = entry.getMessage(); - // get message content - Iterator cBodies = msg.getContentBodyIterator(); - List msgContent = new ArrayList(); - while (cBodies.hasNext()) + ServerMessage serverMsg = entry.getMessage(); + + if(serverMsg instanceof AMQMessage) { - ContentChunk body = cBodies.next(); - if (body.getSize() != 0) + AMQMessage msg = (AMQMessage) serverMsg; + // get message content + Iterator cBodies = msg.getContentBodyIterator(); + List msgContent = new ArrayList(); + while (cBodies.hasNext()) { + ContentChunk body = cBodies.next(); if (body.getSize() != 0) { - ByteBuffer slice = body.getData().slice(); - for (int j = 0; j < slice.limit(); j++) + if (body.getSize() != 0) { - msgContent.add(slice.get()); + ByteBuffer slice = body.getData().slice(); + for (int j = 0; j < slice.limit(); j++) + { + msgContent.add(slice.get()); + } } } } - } - try - { - // Create header attributes list - CommonContentHeaderProperties headerProperties = - (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; - String mimeType = null, encoding = null; - if (headerProperties != null) + + try { - AMQShortString mimeTypeShortSting = headerProperties.getContentType(); - mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); - encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); - } + // Create header attributes list + CommonContentHeaderProperties headerProperties = + (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; + String mimeType = null, encoding = null; + if (headerProperties != null) + { + AMQShortString mimeTypeShortSting = headerProperties.getContentType(); + mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); + encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); + } - Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; + Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; + + return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues); + } + catch (AMQException e) + { + JMException jme = new JMException("Error creating header attributes list: " + e); + jme.initCause(e); + throw jme; + } - return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues); } - catch (AMQException e) + else { - JMException jme = new JMException("Error creating header attributes list: " + e); - jme.initCause(e); - throw jme; + // TODO 0-10 Messages for MBean + return null; } } @@ -398,13 +411,21 @@ for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) { long position = i; - AMQMessage msg = list.get(i - 1).getMessage(); - ContentHeaderBody headerBody = msg.getContentHeaderBody(); - // Create header attributes list - String[] headerAttributes = getMessageHeaderProperties(headerBody); - Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position}; - CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues); - _messageList.put(messageData); + ServerMessage serverMsg = list.get(i - 1).getMessage(); + if(serverMsg instanceof AMQMessage) + { + AMQMessage msg = (AMQMessage) serverMsg; + ContentHeaderBody headerBody = msg.getContentHeaderBody(); + // Create header attributes list + String[] headerAttributes = getMessageHeaderProperties(headerBody); + Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position }; + CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues); + _messageList.put(messageData); + } + else + { + // TODO 0-10 Message + } } } catch (AMQException e) Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Mon Jul 20 19:05:05 2009 @@ -68,4 +68,9 @@ { return _queueMap.values(); } + + public AMQQueue getQueue(String queue) + { + return getQueue(new AMQShortString(queue)); + } } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java Mon Jul 20 19:05:05 2009 @@ -22,12 +22,13 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.AMQMessageHeader; -public interface Filterable +public interface Filterable { - ContentHeaderBody getContentHeaderBody() throws E; + AMQMessageHeader getMessageHeader(); - boolean isPersistent() throws E; + boolean isPersistent(); boolean isRedelivered(); } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Mon Jul 20 19:05:05 2009 @@ -28,16 +28,20 @@ import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.ContentHeaderBodyAdapter; +import org.apache.qpid.server.message.AMQMessageReference; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; import java.util.ArrayList; -import java.util.Collection; -public class IncomingMessage implements Filterable +public class IncomingMessage implements Filterable, InboundMessage { /** Used for debugging purposes. */ @@ -73,6 +77,7 @@ private long _expiration; private Exchange _exchange; + private AMQMessageHeader _messageHeader; public IncomingMessage(final Long messageId, @@ -90,6 +95,7 @@ public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException { _contentHeaderBody = contentHeaderBody; + _messageHeader = new ContentHeaderBodyAdapter(contentHeaderBody); } public void setExpiration() @@ -158,17 +164,19 @@ } AMQMessage message = null; + AMQMessageReference ref = null; try { // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), - _messagePublishInfo, getContentHeaderBody()); + _messagePublishInfo, getContentHeader()); message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo); + ref = (AMQMessageReference) message.newReference(); message.setExpiration(_expiration); message.setClientIdentifier(_publisher.getSessionIdentifier()); @@ -177,8 +185,8 @@ // now that it has all been received, before we attempt delivery _txnContext.messageFullyReceived(isPersistent()); - AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ? - ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null; + AMQShortString userID = getContentHeader().properties instanceof BasicContentHeaderProperties ? + ((BasicContentHeaderProperties) getContentHeader().properties).getUserId() : null; if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString())) { @@ -202,7 +210,7 @@ { int offset; final int queueCount = _destinationQueues.size(); - message.incrementReference(queueCount); + if(queueCount == 1) { offset = 0; @@ -233,7 +241,8 @@ finally { // Remove refence for routing process . Reference count should now == delivered queue count - if(message != null) message.decrementReference(_txnContext.getStoreContext()); + + if(ref != null) ref.release(); } } @@ -250,40 +259,51 @@ public boolean allContentReceived() { - return (_bodyLengthReceived == getContentHeaderBody().bodySize); + return (_bodyLengthReceived == getContentHeader().bodySize); } - public AMQShortString getExchange() throws AMQException + public AMQShortString getExchange() { return _messagePublishInfo.getExchange(); } - public AMQShortString getRoutingKey() throws AMQException + public String getRoutingKey() { - return _messagePublishInfo.getRoutingKey(); + return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); } - public boolean isMandatory() throws AMQException + public String getBinding() + { + return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); + } + + + public boolean isMandatory() { return _messagePublishInfo.isMandatory(); } - public boolean isImmediate() throws AMQException + public boolean isImmediate() { return _messagePublishInfo.isImmediate(); } - public ContentHeaderBody getContentHeaderBody() + public ContentHeaderBody getContentHeader() { return _contentHeaderBody; } + public AMQMessageHeader getMessageHeader() + { + return _messageHeader; + } + public boolean isPersistent() { - return getContentHeaderBody().properties instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() == + return getContentHeader().properties instanceof BasicContentHeaderProperties && + ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT; } @@ -292,6 +312,11 @@ return false; } + public long getSize() + { + return getContentHeader().bodySize; + } + public void setMessageStore(final MessageStore messageStore) { _messageStore = messageStore; @@ -309,7 +334,8 @@ public void route() throws AMQException { - _exchange.route(this); + enqueue(_exchange.route(this)); + } public void enqueue(final ArrayList queues) Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java Mon Jul 20 19:05:05 2009 @@ -22,6 +22,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.message.ServerMessage; /** * NoConsumersException is a {@link RequiredDeliveryException} that represents the failure case where an immediate @@ -35,7 +36,7 @@ */ public class NoConsumersException extends RequiredDeliveryException { - public NoConsumersException(AMQMessage message) + public NoConsumersException(ServerMessage message) { super("Immediate delivery is not possible.", message); } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Mon Jul 20 19:05:05 2009 @@ -21,13 +21,14 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; public enum NotificationCheck { MESSAGE_COUNT_ALERT { - boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener) { int msgCount; final long maximumMessageCount = queue.getMaximumMessageCount(); @@ -41,26 +42,19 @@ }, MESSAGE_SIZE_ALERT(true) { - boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener) { final long maximumMessageSize = queue.getMaximumMessageSize(); if(maximumMessageSize != 0) { // Check for threshold message size long messageSize; - try - { - messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize; - } - catch (AMQException e) - { - messageSize = 0; - } + messageSize = (msg == null) ? 0 : msg.getSize(); if (messageSize >= maximumMessageSize) { - listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]"); + listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]"); return true; } } @@ -70,7 +64,7 @@ }, QUEUE_DEPTH_ALERT { - boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener) { // Check for threshold queue depth in bytes final long maximumQueueDepth = queue.getMaximumQueueDepth(); @@ -91,7 +85,7 @@ }, MESSAGE_AGE_ALERT { - boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener) { final long maxMessageAge = queue.getMaximumMessageAge(); @@ -133,6 +127,6 @@ return _messageSpecific; } - abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener); + abstract boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener); } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Mon Jul 20 19:05:05 2009 @@ -22,6 +22,7 @@ import org.apache.qpid.framing.CommonContentHeaderProperties; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; public class PriorityQueueList implements QueueEntryList { @@ -52,26 +53,18 @@ return _queue; } - public QueueEntry add(AMQMessage message) + public QueueEntry add(ServerMessage message) { - try + int index = message.getMessageHeader().getPriority() - _priorityOffset; + if(index >= _priorities) { - int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; - if(index >= _priorities) - { - index = _priorities-1; - } - else if(index < 0) - { - index = 0; - } - return _priorityLists[index].add(message); + index = _priorities-1; } - catch (AMQException e) + else if(index < 0) { - // TODO - fix AMQ Exception - throw new RuntimeException(e); + index = 0; } + return _priorityLists[index].add(message); } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Mon Jul 20 19:05:05 2009 @@ -3,6 +3,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.message.ServerMessage; /* * @@ -133,7 +134,7 @@ AMQQueue getQueue(); - AMQMessage getMessage(); + ServerMessage getMessage(); long getSize(); @@ -155,8 +156,6 @@ void release(); - String debugIdentity(); - boolean immediateAndNotDelivered(); void setRedelivered(boolean b); Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Jul 20 19:05:05 2009 @@ -23,6 +23,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.MessageReference; import org.apache.log4j.Logger; import java.util.Set; @@ -42,7 +44,7 @@ private final SimpleQueueEntryList _queueEntryList; - private AMQMessage _message; + private MessageReference _message; private Set _rejectedBy = null; @@ -75,6 +77,8 @@ private volatile long _entryId; volatile QueueEntryImpl _next; + private boolean _deliveredToConsumer; + private boolean _redelivered; QueueEntryImpl(SimpleQueueEntryList queueEntryList) @@ -84,18 +88,18 @@ } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId) + public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId) { _queueEntryList = queueEntryList; - _message = message; + _message = message == null ? null : message.newReference(); _entryIdUpdater.set(this, entryId); } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message) + public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message) { _queueEntryList = queueEntryList; - _message = message; + _message = message == null ? null : message.newReference(); } protected void setEntryId(long entryId) @@ -113,9 +117,9 @@ return _queueEntryList.getQueue(); } - public AMQMessage getMessage() + public ServerMessage getMessage() { - return _message; + return _message == null ? null : _message.getMessage(); } public long getSize() @@ -125,12 +129,21 @@ public boolean getDeliveredToConsumer() { - return getMessage().getDeliveredToConsumer(); + return _deliveredToConsumer; } public boolean expired() throws AMQException { - return getMessage().expired(getQueue()); + long expiration = getMessage().getExpiration(); + if (expiration != 0L) + { + long now = System.currentTimeMillis(); + + return (now > expiration); + } + + return false; + } public boolean isAcquired() @@ -167,7 +180,7 @@ public void setDeliveredToSubscription() { - getMessage().setDeliveredToConsumer(); + _deliveredToConsumer = true; } public void release() @@ -175,20 +188,15 @@ _stateUpdater.set(this,AVAILABLE_STATE); } - public String debugIdentity() - { - return getMessage().debugIdentity(); - } - public boolean immediateAndNotDelivered() { - return _message.immediateAndNotDelivered(); + return getMessage().isImmediate() && !_deliveredToConsumer; } public void setRedelivered(boolean b) { - getMessage().setRedelivered(b); + _redelivered = b; } public Subscription getDeliveredSubscription() @@ -223,7 +231,7 @@ } else { - _log.warn("Requesting rejection by null subscriber:" + debugIdentity()); + _log.warn("Requesting rejection by null subscriber:" + this); } } @@ -284,7 +292,9 @@ { if(delete()) { - getMessage().decrementReference(storeContext); + StoreContext sc = StoreContext.setCurrentContext(storeContext); + _message.release(); + StoreContext.setCurrentContext(sc); } } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Mon Jul 20 19:05:05 2009 @@ -20,11 +20,13 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.message.ServerMessage; + public interface QueueEntryList { AMQQueue getQueue(); - QueueEntry add(AMQMessage message); + QueueEntry add(ServerMessage message); QueueEntry next(QueueEntry node); Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Mon Jul 20 19:05:05 2009 @@ -40,4 +40,5 @@ Collection getQueues(); + AMQQueue getQueue(String queue); } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Jul 20 19:05:05 2009 @@ -29,6 +29,8 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; /* * @@ -319,7 +321,7 @@ // ------ Enqueue / Dequeue - public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException + public QueueEntry enqueue(ServerMessage message) throws AMQException { incrementQueueCount(); @@ -406,8 +408,10 @@ } } + if (entry.immediateAndNotDelivered()) { + StoreContext storeContext = StoreContext.getCurrentContext(); dequeue(storeContext, entry); entry.dispose(storeContext); } @@ -462,7 +466,7 @@ // Simple Queues don't :-) } - private void incrementQueueSize(final AMQMessage message) + private void incrementQueueSize(final ServerMessage message) { getAtomicQueueSize().addAndGet(message.getSize()); } @@ -573,10 +577,10 @@ try { - AMQMessage msg = entry.getMessage(); + ServerMessage msg = entry.getMessage(); if (msg.isPersistent()) { - _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId()); + _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageNumber()); } //entry.dispose(storeContext); @@ -767,7 +771,7 @@ public boolean accept(QueueEntry entry) { - final long messageId = entry.getMessage().getMessageId(); + final long messageId = entry.getMessage().getMessageNumber(); return messageId >= fromMessageId && messageId <= toMessageId; } @@ -786,7 +790,7 @@ public boolean accept(QueueEntry entry) { - _complete = entry.getMessage().getMessageId() == messageId; + _complete = entry.getMessage().getMessageNumber() == messageId; return _complete; } @@ -828,7 +832,7 @@ public boolean accept(QueueEntry entry) { - final long messageId = entry.getMessage().getMessageId(); + final long messageId = entry.getMessage().getMessageNumber(); return (messageId >= fromMessageId) && (messageId <= toMessageId) && entry.acquire(); @@ -847,11 +851,11 @@ // Move the messages in on the message store. for (QueueEntry entry : entries) { - AMQMessage message = entry.getMessage(); + ServerMessage message = entry.getMessage(); if (message.isPersistent() && toQueue.isDurable()) { - store.enqueueMessage(storeContext, toQueue, message.getMessageId()); + store.enqueueMessage(storeContext, toQueue, message.getMessageNumber()); } // dequeue does not decrement the refence count entry.dequeue(storeContext); @@ -882,9 +886,11 @@ try { + StoreContext.setCurrentContext(storeContext); + for (QueueEntry entry : entries) { - toQueue.enqueue(storeContext, entry.getMessage()); + toQueue.enqueue(entry.getMessage()); entry.delete(); } } @@ -896,6 +902,11 @@ { throw new RuntimeException(e); } + finally + { + StoreContext.clearCurrentContext(); + + } } @@ -912,17 +923,9 @@ public boolean accept(QueueEntry entry) { - final long messageId = entry.getMessage().getMessageId(); - if ((messageId >= fromMessageId) - && (messageId <= toMessageId)) - { - if (!entry.isDeleted()) - { - return entry.getMessage().incrementReference(); - } - } - - return false; + final long messageId = entry.getMessage().getMessageNumber(); + return ((messageId >= fromMessageId) + && (messageId <= toMessageId)); } public boolean filterComplete() @@ -938,11 +941,15 @@ // Move the messages in on the message store. for (QueueEntry entry : entries) { - AMQMessage message = entry.getMessage(); + ServerMessage message = entry.getMessage(); - if (message.isReferenced() && message.isPersistent() && toQueue.isDurable()) + if (message.isPersistent() && toQueue.isDurable()) { - store.enqueueMessage(storeContext, toQueue, message.getMessageId()); + + StoreContext sc = StoreContext.setCurrentContext(storeContext); + store.enqueueMessage(storeContext, toQueue, message.getMessageNumber()); + StoreContext.setCurrentContext(sc); + } } @@ -973,9 +980,11 @@ { for (QueueEntry entry : entries) { - if (entry.getMessage().isReferenced()) + + ServerMessage message = entry.getMessage(); + if (message != null) { - toQueue.enqueue(storeContext, entry.getMessage()); + toQueue.enqueue(entry.getMessage()); } } } @@ -1001,7 +1010,7 @@ { QueueEntry node = queueListIterator.getNode(); - final long messageId = node.getMessage().getMessageId(); + final long messageId = node.getMessage().getMessageNumber(); if ((messageId >= fromMessageId) && (messageId <= toMessageId) @@ -1418,7 +1427,7 @@ } } - @Override + public void checkMessageStatus() throws AMQException { @@ -1581,7 +1590,7 @@ for (int i = 0; i < num && !it.atTail(); i++) { it.advance(); - ids.add(it.getNode().getMessage().getMessageId()); + ids.add(it.getNode().getMessage().getMessageNumber()); } return ids; } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Mon Jul 20 19:05:05 2009 @@ -1,5 +1,8 @@ package org.apache.qpid.server.queue; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; + import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /* @@ -74,7 +77,7 @@ } - public QueueEntry add(AMQMessage message) + public QueueEntry add(ServerMessage message) { QueueEntryImpl node = new QueueEntryImpl(this, message); for (;;) Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Mon Jul 20 19:05:05 2009 @@ -1352,7 +1352,10 @@ public void process() throws AMQException { - _queue.enqueue(_context, _message); + StoreContext.setCurrentContext(_context); + _queue.enqueue(_message); + StoreContext.clearCurrentContext(); + } @@ -1414,7 +1417,7 @@ if(message != null) { - message.incrementReference(); +// message.incrementReference(); } else { Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Mon Jul 20 19:05:05 2009 @@ -32,9 +32,12 @@ { private static final Logger _logger = Logger.getLogger(StoreContext.class); + private static final ThreadLocal _threadLocalContext = new ThreadLocal(); + private String _name; private Object _payload; + public StoreContext() { _name = "StoreContext"; @@ -68,4 +71,24 @@ { return "<_name = " + _name + ", _payload = " + _payload + ">"; } + + + public static StoreContext setCurrentContext(StoreContext context) + { + StoreContext sc = getCurrentContext(); + _threadLocalContext.set(context); + return sc; + } + + public static StoreContext getCurrentContext() + { + return _threadLocalContext.get(); + } + + public static void clearCurrentContext() + { + _threadLocalContext.set(null); + } + + } Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=795958&r1=795957&r2=795958&view=diff ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Mon Jul 20 19:05:05 2009 @@ -32,8 +32,10 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.filter.FilterManager; @@ -377,16 +379,19 @@ { if (_logger.isDebugEnabled()) { - _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity()); + _logger.debug("Subscription:" + this + " rejected message:" + entry); } // return false; } if (_noLocal) { - //todo - client id should be recoreded so we don't have to handle + + AMQMessage message = (AMQMessage) entry.getMessage(); + + //todo - client id should be recorded so we don't have to handle // the case where this is null. - final Object publisherId = entry.getMessage().getPublisherClientInstance(); + final Object publisherId = message.getPublisherClientInstance(); // We don't want local messages so check to see if message is one we sent Object localInstance; @@ -404,8 +409,8 @@ localInstance = getProtocolSession().getClientIdentifier(); - //todo - client id should be recoreded so we don't have to do the null check - if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier())) + //todo - client id should be recorded so we don't have to do the null check + if (localInstance != null && localInstance.equals(message.getPublisherIdentifier())) { return false; } @@ -417,7 +422,7 @@ if (_logger.isDebugEnabled()) { - _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); + _logger.debug("(" + this + ") checking filters for message (" + entry); } return checkFilters(entry); Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=795958&view=auto ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (added) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Mon Jul 20 19:05:05 2009 @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Method; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class ServerConnection extends Connection +{ + @Override + protected void invoke(Method method) + { + super.invoke(method); + } + + @Override + protected void setState(State state) + { + super.setState(state); + } + + @Override + public ServerConnectionDelegate getConnectionDelegate() + { + return (ServerConnectionDelegate) super.getConnectionDelegate(); + } + + public void setConnectionDelegate(ServerConnectionDelegate delegate) + { + super.setConnectionDelegate(delegate); + } + + private VirtualHost _virtualHost; + + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(VirtualHost virtualHost) + { + _virtualHost = virtualHost; + } +} Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=795958&view=auto ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (added) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Mon Jul 20 19:05:05 2009 @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.transport.*; + +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import javax.security.sasl.SaslServer; +import javax.security.sasl.SaslException; +import java.util.*; + + +public class ServerConnectionDelegate extends ServerDelegate +{ + + private String _localFQDN; + private final IApplicationRegistry _appRegistry; + + + public ServerConnectionDelegate(IApplicationRegistry appRegistry, + String localFQDN) + { + this(Collections.EMPTY_MAP, Collections.singletonList((Object)"en_US"), appRegistry, localFQDN); + } + + + public ServerConnectionDelegate(Map properties, + List locales, + IApplicationRegistry appRegistry, + String localFQDN) + { + super(properties, parseToList(appRegistry.getAuthenticationManager().getMechanisms()), locales); + _appRegistry = appRegistry; + _localFQDN = localFQDN; + } + + private static List parseToList(String mechanisms) + { + List list = new ArrayList(); + StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); + while(tokenizer.hasMoreTokens()) + { + list.add(tokenizer.nextToken()); + } + return list; + } + + @Override public ServerSession getSession(Connection conn, SessionAttach atc) + { + + SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry); + + ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0); + //ssn.setSessionListener(new Echo()); + return ssn; + } + + + + + @Override + protected SaslServer createSaslServer(String mechanism) throws SaslException + { + return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN); + + } + + + @Override public void connectionOpen(Connection conn, ConnectionOpen open) + { + ServerConnection sconn = (ServerConnection) conn; + + VirtualHost vhost; + String vhostName; + if(open.hasVirtualHost()) + { + vhostName = open.getVirtualHost(); + } + else + { + vhostName = ""; + } + vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName); + + if(vhost != null) + { + sconn.setVirtualHost(vhost); + + sconn.invoke(new ConnectionOpenOk(Collections.EMPTY_LIST)); + + sconn.setState(Connection.State.OPEN); + } + else + { + sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown vistrulhost '"+vhostName+"'")); + sconn.setState(Connection.State.CLOSING); + } + + } +} Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=795958&view=auto ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (added) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Jul 20 19:05:05 2009 @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.transport.*; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.AMQException; + +import java.util.ArrayList; + +public class ServerSession extends Session +{ + ServerSession(Connection connection, Binary name, long expiry) + { + super(connection, name, expiry); + } + + ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) + { + super(connection, delegate, name, expiry); + } + + public void enqueue(ServerMessage message, ArrayList queues) + { + // TODO Txn + + try + { + for(AMQQueue q : queues) + { + q.enqueue(message); + } + } + catch (AMQException e) + { + // TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +} Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=795958&view=auto ============================================================================== --- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (added) +++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Mon Jul 20 19:05:05 2009 @@ -0,0 +1,402 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.transport.*; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.AMQException; + +import java.util.ArrayList; + +public class ServerSessionDelegate extends SessionDelegate +{ + private final IApplicationRegistry _appRegistry; + + public ServerSessionDelegate(IApplicationRegistry appRegistry) + { + _appRegistry = appRegistry; + } + + @Override + public void messageAccept(Session session, MessageAccept method) + { + super.messageAccept(session, method); + } + + @Override + public void messageReject(Session session, MessageReject method) + { + super.messageReject(session, method); + } + + @Override + public void messageRelease(Session session, MessageRelease method) + { + super.messageRelease(session, method); + } + + @Override + public void messageAcquire(Session session, MessageAcquire method) + { + super.messageAcquire(session, method); + } + + @Override + public void messageResume(Session session, MessageResume method) + { + super.messageResume(session, method); + } + + @Override + public void messageSubscribe(Session session, MessageSubscribe method) + { + super.messageSubscribe(session, method); + } + + + @Override + public void messageTransfer(Session ssn, MessageTransfer xfr) + { + ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn); + Exchange exchange; + if(xfr.hasDestination()) + { + exchange = exchangeRegistry.getExchange(xfr.getDestination()); + } + else + { + exchange = exchangeRegistry.getDefaultExchange(); + } + + MessageTransferMessage message = new MessageTransferMessage(xfr); + try + { + ArrayList queues = exchange.route(message); + + ((ServerSession) ssn).enqueue(message, queues); + + + System.out.println(queues); + + ssn.processed(xfr); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + + + super.messageTransfer(ssn, xfr); //To change body of overridden methods use File | Settings | File Templates. + } + + @Override + public void messageCancel(Session session, MessageCancel method) + { + super.messageCancel(session, method); + } + + @Override + public void messageFlush(Session session, MessageFlush method) + { + super.messageFlush(session, method); + } + + @Override + public void txSelect(Session session, TxSelect method) + { + super.txSelect(session, method); + } + + @Override + public void txCommit(Session session, TxCommit method) + { + super.txCommit(session, method); + } + + @Override + public void txRollback(Session session, TxRollback method) + { + super.txRollback(session, method); + } + + @Override + public void dtxSelect(Session session, DtxSelect method) + { + super.dtxSelect(session, method); + } + + @Override + public void dtxStart(Session session, DtxStart method) + { + super.dtxStart(session, method); + } + + @Override + public void dtxEnd(Session session, DtxEnd method) + { + super.dtxEnd(session, method); + } + + @Override + public void dtxCommit(Session session, DtxCommit method) + { + super.dtxCommit(session, method); + } + + @Override + public void dtxForget(Session session, DtxForget method) + { + super.dtxForget(session, method); + } + + @Override + public void dtxGetTimeout(Session session, DtxGetTimeout method) + { + super.dtxGetTimeout(session, method); + } + + @Override + public void dtxPrepare(Session session, DtxPrepare method) + { + super.dtxPrepare(session, method); + } + + @Override + public void dtxRecover(Session session, DtxRecover method) + { + super.dtxRecover(session, method); + } + + @Override + public void dtxRollback(Session session, DtxRollback method) + { + super.dtxRollback(session, method); + } + + @Override + public void dtxSetTimeout(Session session, DtxSetTimeout method) + { + super.dtxSetTimeout(session, method); + } + + @Override + public void exchangeDeclare(Session session, ExchangeDeclare method) + { + String exchangeName = method.getExchange(); + + Exchange exchange = getExchange(session, exchangeName); + + if(method.getPassive()) + { + if(exchange == null) + { + ExecutionException ex = new ExecutionException(); + ex.setErrorCode(ExecutionErrorCode.NOT_FOUND); + ex.setCommandId(method.getId()); + + ex.setDescription("not-found: exchange-name '"+exchangeName+"'"); + + session.invoke(ex); + session.close(); + } + + } + else + { + // TODO + } + super.exchangeDeclare(session, method); + } + + private Exchange getExchange(Session session, String exchangeName) + { + ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); + return exchangeRegistry.getExchange(exchangeName); + } + + private ExchangeRegistry getExchangeRegistry(Session session) + { + VirtualHost virtualHost = getVirtualHost(session); + return virtualHost.getExchangeRegistry(); + + } + + private VirtualHost getVirtualHost(Session session) + { + ServerConnection conn = getServerConnection(session); + VirtualHost vhost = conn.getVirtualHost(); + return vhost; + } + + private ServerConnection getServerConnection(Session session) + { + ServerConnection conn = (ServerConnection) session.getConnection(); + return conn; + } + + @Override + public void exchangeDelete(Session session, ExchangeDelete method) + { + super.exchangeDelete(session, method); + } + + @Override + public void exchangeQuery(Session session, ExchangeQuery method) + { + super.exchangeQuery(session, method); + + } + + @Override + public void exchangeBind(Session session, ExchangeBind method) + { + super.exchangeBind(session, method); + } + + @Override + public void exchangeUnbind(Session session, ExchangeUnbind method) + { + super.exchangeUnbind(session, method); + } + + @Override + public void exchangeBound(Session session, ExchangeBound method) + { + + + ExchangeBoundResult result = new ExchangeBoundResult(); + if(method.hasExchange()) + { + Exchange exchange = getExchange(session, method.getExchange()); + + if(exchange == null) + { + result.setExchangeNotFound(true); + } + + if(method.hasQueue()) + { + + AMQQueue queue = getQueue(session, method.getQueue()); + if(queue == null) + { + result.setQueueNotFound(true); + } + + if(exchange != null && queue != null) + { + + if(method.hasBindingKey()) + { + + if(method.hasArguments()) + { + // TODO + } + result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue)); + + } + + result.setQueueNotMatched(!exchange.isBound(queue)); + + } + } + else if(exchange != null && method.hasBindingKey()) + { + if(method.hasArguments()) + { + // TODO + } + result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); + + } + + } + else if(method.hasQueue()) + { + AMQQueue queue = getQueue(session, method.getQueue()); + if(queue == null) + { + result.setQueueNotFound(true); + } + else + { + if(method.hasBindingKey()) + { + if(method.hasArguments()) + { + // TODO + } + + // TODO + } + } + + } + + + session.executionResult((int) method.getId(), result); + super.exchangeBound(session, method); + } + + private AMQQueue getQueue(Session session, String queue) + { + QueueRegistry queueRegistry = getQueueRegistry(session); + return queueRegistry.getQueue(queue); + } + + private QueueRegistry getQueueRegistry(Session session) + { + return getVirtualHost(session).getQueueRegistry(); + } + + @Override + public void queueDeclare(Session session, QueueDeclare method) + { + super.queueDeclare(session, method); + } + + @Override + public void queueDelete(Session session, QueueDelete method) + { + super.queueDelete(session, method); + } + + @Override + public void queuePurge(Session session, QueuePurge method) + { + super.queuePurge(session, method); + } + + @Override + public void queueQuery(Session session, QueueQuery method) + { + super.queueQuery(session, method); + } +} --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org