qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1644374 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java: broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apache/qpid/server/virtualhost/...
Date Wed, 10 Dec 2014 11:08:29 GMT
Author: rgodfrey
Date: Wed Dec 10 11:08:29 2014
New Revision: 1644374

URL: http://svn.apache.org/r1644374
Log:
Force close connection on 0-10 when message sent to closing virtual host

Added:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
  (with props)
Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1644374&r1=1644373&r2=1644374&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
Wed Dec 10 11:08:29 2014
@@ -64,12 +64,12 @@ import org.apache.qpid.server.queue.Base
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
 import org.apache.qpid.server.virtualhost.RequiredExchangeException;
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 
 public abstract class AbstractExchange<T extends AbstractExchange<T>>
         extends AbstractConfiguredObject<T>
@@ -510,7 +510,7 @@ public abstract class AbstractExchange<T
     {
         if (_virtualHost.getState() != State.ACTIVE)
         {
-            throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState()
+ " prevents the message from being sent");
+            throw new VirtualHostUnavailableException(this._virtualHost);
         }
 
         List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
@@ -910,4 +910,5 @@ public abstract class AbstractExchange<T
             this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord());
         }
     }
+
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1644374&r1=1644373&r2=1644374&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Wed Dec 10 11:08:29 2014
@@ -50,6 +50,7 @@ import org.apache.qpid.server.connection
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogMessage;
@@ -2521,7 +2522,7 @@ public abstract class AbstractQueue<X ex
     {
         if (_virtualHost.getState() != State.ACTIVE)
         {
-            throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState()
+ " prevents the message from being sent");
+            throw new VirtualHostUnavailableException(this._virtualHost);
         }
 
         if(!message.isReferenced(this))

Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java?rev=1644374&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
(added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
Wed Dec 10 11:08:29 2014
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
+public class VirtualHostUnavailableException extends ConnectionScopedRuntimeException
+{
+    public VirtualHostUnavailableException(VirtualHostImpl<?, ?, ?> host)
+    {
+        super("Virtualhost state "
+              + host.getState()
+              + " prevents the message from being sent");
+    }
+}

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1644374&r1=1644373&r2=1644374&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
Wed Dec 10 11:08:29 2014
@@ -37,6 +37,7 @@ import org.apache.qpid.exchange.Exchange
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -381,58 +382,69 @@ public class ServerSessionDelegate exten
                     new MessageTransferMessage(storeMessage, serverSession.getReference());
             MessageReference<MessageTransferMessage> reference = message.newReference();
 
-            final InstanceProperties instanceProperties = new InstanceProperties()
+            try
             {
-                @Override
-                public Object getProperty(final Property prop)
+                final InstanceProperties instanceProperties = new InstanceProperties()
                 {
-                    switch (prop)
+                    @Override
+                    public Object getProperty(final Property prop)
                     {
-                        case EXPIRATION:
-                            return message.getExpiration();
-                        case IMMEDIATE:
-                            return message.isImmediate();
-                        case MANDATORY:
-                            return (delvProps == null || !delvProps.getDiscardUnroutable())
-                                   && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
-                        case PERSISTENT:
-                            return message.isPersistent();
-                        case REDELIVERED:
-                            return delvProps.getRedelivered();
+                        switch (prop)
+                        {
+                            case EXPIRATION:
+                                return message.getExpiration();
+                            case IMMEDIATE:
+                                return message.isImmediate();
+                            case MANDATORY:
+                                return (delvProps == null || !delvProps.getDiscardUnroutable())
+                                       && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
+                            case PERSISTENT:
+                                return message.isPersistent();
+                            case REDELIVERED:
+                                return delvProps.getRedelivered();
+                        }
+                        return null;
                     }
-                    return null;
-                }
-            };
+                };
 
-            int enqueues = serverSession.enqueue(message, instanceProperties, destination);
+                int enqueues = serverSession.enqueue(message, instanceProperties, destination);
 
-            if (enqueues == 0)
-            {
-                if ((delvProps == null || !delvProps.getDiscardUnroutable())
-                    && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+                if (enqueues == 0)
                 {
-                    RangeSet rejects = RangeSetFactory.createRangeSet();
-                    rejects.add(xfr.getId());
-                    MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE,
"Unroutable");
-                    ssn.invoke(reject);
+                    if ((delvProps == null || !delvProps.getDiscardUnroutable())
+                        && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+                    {
+                        RangeSet rejects = RangeSetFactory.createRangeSet();
+                        rejects.add(xfr.getId());
+                        MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE,
"Unroutable");
+                        ssn.invoke(reject);
+                    }
+                    else
+                    {
+                        virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(),
+                                                                                        
messageMetaData.getRoutingKey()));
+                    }
+                }
+
+                if (serverSession.isTransactional())
+                {
+                    serverSession.processed(xfr);
                 }
                 else
                 {
-                    virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(),
-                                                                                     messageMetaData.getRoutingKey()));
+                    serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
+                                               new CommandProcessedAction(serverSession,
xfr));
                 }
             }
-
-            if (serverSession.isTransactional())
+            catch (VirtualHostUnavailableException e)
             {
-                serverSession.processed(xfr);
+                getServerConnection(serverSession).close(AMQConstant.CONNECTION_FORCED, e.getMessage());
             }
-            else
+            finally
             {
-                serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
-                                           new CommandProcessedAction(serverSession, xfr));
+                reference.release();
             }
-            reference.release();
+
         }
     }
 
@@ -549,7 +561,7 @@ public class ServerSessionDelegate exten
         {
             try
             {
-                ((ServerSession)session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
+                ((ServerSession) session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
             }
             catch (TimeoutDtxException e)
             {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message