qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1725700 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/security/ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/amqp-0-10-protocol...
Date Wed, 20 Jan 2016 11:04:44 GMT
Author: rgodfrey
Date: Wed Jan 20 11:04:43 2016
New Revision: 1725700

URL: http://svn.apache.org/viewvc?rev=1725700&view=rev
Log:
QPID-7008 : Add message userId verification to AMQP 0-10 and 1.0 protocols

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
Wed Jan 20 11:04:43 2016
@@ -85,7 +85,7 @@ public interface Connection<X extends Co
     Protocol getProtocol();
 
     @DerivedAttribute
-    VirtualHost getVirtualHost();
+    VirtualHost<?> getVirtualHost();
 
     @DerivedAttribute
     Port<?> getPort();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
Wed Jan 20 11:04:43 2016
@@ -659,8 +659,21 @@ public class SecurityManager
         }
     }
 
-    public void authorisePublish(final boolean immediate, String routingKey, String exchangeName,
String virtualHostName, Subject currentSubject)
+    public void authorisePublish(final boolean immediate,
+                                 String routingKey,
+                                 String exchangeName,
+                                 String virtualHostName,
+                                 Subject currentSubject,
+                                 final String messageUserId,
+                                 final AMQPConnection<?> connection)
     {
+        if(!connection.isAuthorizedMessagePrincipal(messageUserId))
+        {
+            throw new AccessControlException("The user id of the message '"
+                                             + messageUserId
+                                             + "' is not valid on a connection authenticated
as  "
+                                             + connection.getAuthorizedPrincipal().getName());
+        }
         PublishAccessCheckCacheEntry key = new PublishAccessCheckCacheEntry(immediate, routingKey,
exchangeName, virtualHostName);
         PublishAccessCheck check = _publishAccessCheckCache.get(key);
         if (check == null)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
Wed Jan 20 11:04:43 2016
@@ -69,4 +69,6 @@ public interface AMQPConnection<C extend
     void reserveOutboundMessageSpace(long size);
 
     boolean isIOThread();
+
+    boolean isAuthorizedMessagePrincipal(String messageUserId);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
Wed Jan 20 11:04:43 2016
@@ -42,6 +42,7 @@ import com.google.common.util.concurrent
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -104,6 +105,9 @@ public abstract class AbstractAMQPConnec
     private volatile AccessControlContext _accessControllerContext;
     private volatile Thread _ioThread;
 
+    private boolean _messageAuthorizationRequired;
+
+
     public AbstractAMQPConnection(Broker<?> broker,
                                   ServerNetworkConnection network,
                                   AmqpPort<?> port,
@@ -122,6 +126,7 @@ public abstract class AbstractAMQPConnec
         _connectionId = connectionId;
         _aggregateTicker = aggregateTicker;
         _subject.getPrincipals().add(new ConnectionPrincipal(this));
+
         updateAccessControllerContext();
 
         _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
@@ -472,9 +477,10 @@ public abstract class AbstractAMQPConnec
     {
     }
 
-    public void virtualHostAssociated()
+    final public void virtualHostAssociated()
     {
         getVirtualHost().registerConnection(this);
+        _messageAuthorizationRequired = getVirtualHost().getContextValue(Boolean.class, Broker.BROKER_MSG_AUTH);
     }
 
     @Override
@@ -664,6 +670,11 @@ public abstract class AbstractAMQPConnec
 
     protected abstract EventLogger getEventLogger();
 
+    public final boolean isAuthorizedMessagePrincipal(final String userId)
+    {
+        return !_messageAuthorizationRequired || getAuthorizedPrincipal().getName().equals(userId
== null? "" : userId);
+    }
+
     private class SlowConnectionOpenTicker implements Ticker
     {
         private final long _allowedTime;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
Wed Jan 20 11:04:43 2016
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.nio.charset.StandardCharsets;
 import java.security.AccessControlException;
 import java.util.Collection;
 import java.util.EnumSet;
@@ -377,6 +378,12 @@ public class ServerSessionDelegate exten
         return queue.verifySessionAccess(session);
     }
 
+    private static String getMessageUserId(MessageTransfer xfr)
+    {
+        byte[] userIdBytes = xfr.getHeader() == null ? null : xfr.getHeader().getMessageProperties()
== null ? null : xfr.getHeader().getMessageProperties().getUserId();
+        return userIdBytes == null ? null : new String(userIdBytes, StandardCharsets.UTF_8);
+    }
+
     @Override
     public void messageTransfer(Session ssn, final MessageTransfer xfr)
     {
@@ -416,7 +423,9 @@ public class ServerSessionDelegate exten
                                               messageMetaData.getRoutingKey(),
                                               destination.getName(),
                                               virtualHost.getName(),
-                                              serverSession.getAuthorizedSubject());
+                                              serverSession.getAuthorizedSubject(),
+                                              getMessageUserId(xfr),
+                                              serverSession.getAMQPConnection());
                 }
                 catch (AccessControlException e)
                 {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Wed Jan 20 11:04:43 2016
@@ -128,8 +128,6 @@ public class AMQChannel
     private final DefaultQueueAssociationClearingTask
             _defaultQueueAssociationClearingTask = new DefaultQueueAssociationClearingTask();
 
-    private final boolean _messageAuthorizationRequired;
-
     private final int _channelId;
 
 
@@ -242,7 +240,6 @@ public class AMQChannel
         _accessControllerContext = org.apache.qpid.server.security.SecurityManager.getAccessControlContextFromSubject(_subject);
 
         _maxUncommittedInMemorySize = connection.getVirtualHost().getContextValue(Long.class,
Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
-        _messageAuthorizationRequired = connection.getVirtualHost().getContextValue(Boolean.class,
Broker.BROKER_MSG_AUTH);
         _logSubject = new ChannelLogSubject(this);
 
         _messageStore = messageStore;
@@ -397,12 +394,6 @@ public class AMQChannel
 
     public void setPublishFrame(MessagePublishInfo info, final MessageDestination e)
     {
-        String routingKey = AMQShortString.toString(info.getRoutingKey());
-        VirtualHost<?> virtualHost = getVirtualHost();
-        SecurityManager securityManager = virtualHost.getSecurityManager();
-
-        securityManager.authorisePublish(info.isImmediate(), routingKey, e.getName(), virtualHost.getName(),
_subject);
-
         _currentMessage = new IncomingMessage(info);
         _currentMessage.setMessageDestination(e);
     }
@@ -424,59 +415,63 @@ public class AMQChannel
         // check and deliver if header says body length is zero
         if (_currentMessage.allContentReceived())
         {
-            if(_confirmOnPublish)
-            {
-                _confirmedMessageCounter++;
-            }
-            Runnable finallyAction = null;
-            ContentHeaderBody contentHeader = _currentMessage.getContentHeader();
-
-            long bodySize = _currentMessage.getSize();
-            long timestamp = contentHeader.getProperties().getTimestamp();
+            MessagePublishInfo info = _currentMessage.getMessagePublishInfo();
+            String routingKey = AMQShortString.toString(info.getRoutingKey());
+            VirtualHost<?> virtualHost = getVirtualHost();
 
+            SecurityManager securityManager = virtualHost.getSecurityManager();
             try
             {
+                ContentHeaderBody contentHeader = _currentMessage.getContentHeader();
+                securityManager.authorisePublish(info.isImmediate(),
+                                                 routingKey,
+                                                 _currentMessage.getDestination().getName(),
+                                                 virtualHost.getName(),
+                                                 _subject,
+                                                 AMQShortString.toString(contentHeader.getProperties().getUserId()),
+                                                 _connection);
 
-                final MessagePublishInfo messagePublishInfo = _currentMessage.getMessagePublishInfo();
-                final MessageDestination destination = _currentMessage.getDestination();
-
-                final MessageMetaData messageMetaData =
-                        new MessageMetaData(messagePublishInfo,
-                                            contentHeader,
-                                            getConnection().getLastReadTime());
-
-                final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
-                int bodyCount = _currentMessage.getBodyCount();
-                if(bodyCount > 0)
+                if (_confirmOnPublish)
                 {
-                    long bodyLengthReceived = 0;
-                    for(int i = 0 ; i < bodyCount ; i++)
-                    {
-                        ContentBody contentChunk = _currentMessage.getContentChunk(i);
-                        handle.addContent(contentChunk.getPayload());
-                        bodyLengthReceived += contentChunk.getSize();
-                        contentChunk.dispose();
-                    }
+                    _confirmedMessageCounter++;
                 }
-                final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded();
+                Runnable finallyAction = null;
+
+                long bodySize = _currentMessage.getSize();
+                long timestamp = contentHeader.getProperties().getTimestamp();
 
-                final AMQMessage amqMessage = createAMQMessage(storedMessage);
-                MessageReference reference = amqMessage.newReference();
                 try
                 {
 
-                    _currentMessage = null;
+                    final MessagePublishInfo messagePublishInfo = _currentMessage.getMessagePublishInfo();
+                    final MessageDestination destination = _currentMessage.getDestination();
 
-                    if(!checkMessageUserId(contentHeader))
+                    final MessageMetaData messageMetaData =
+                            new MessageMetaData(messagePublishInfo,
+                                                contentHeader,
+                                                getConnection().getLastReadTime());
+
+                    final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
+                    int bodyCount = _currentMessage.getBodyCount();
+                    if (bodyCount > 0)
                     {
-                        if(_confirmOnPublish)
+                        for (int i = 0; i < bodyCount; i++)
                         {
-                            _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter,
false, false)));
+                            ContentBody contentChunk = _currentMessage.getContentChunk(i);
+                            handle.addContent(contentChunk.getPayload());
+                            contentChunk.dispose();
                         }
-                        _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED,
"Access Refused", amqMessage));
                     }
-                    else
+                    final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded();
+
+                    final AMQMessage amqMessage = createAMQMessage(storedMessage);
+                    MessageReference reference = amqMessage.newReference();
+                    try
                     {
+
+                        _currentMessage = null;
+
+
                         final boolean immediate = messagePublishInfo.isImmediate();
 
                         final InstanceProperties instanceProperties =
@@ -485,7 +480,7 @@ public class AMQChannel
                                     @Override
                                     public Object getProperty(final Property prop)
                                     {
-                                        switch(prop)
+                                        switch (prop)
                                         {
                                             case EXPIRATION:
                                                 return amqMessage.getExpiration();
@@ -507,13 +502,13 @@ public class AMQChannel
                                                         instanceProperties, _transaction,
                                                         immediate ? _immediateAction : _capacityCheckAction
                                                        );
-                        if(enqueues == 0)
+                        if (enqueues == 0)
                         {
                             finallyAction = handleUnroutableMessage(amqMessage);
                         }
                         else
                         {
-                            if(_confirmOnPublish)
+                            if (_confirmOnPublish)
                             {
                                 BasicAckBody responseBody = _connection.getMethodRegistry()
                                         .createBasicAckBody(_confirmedMessageCounter, false);
@@ -522,23 +517,29 @@ public class AMQChannel
                             incrementUncommittedMessageSize(storedMessage);
                             incrementOutstandingTxnsIfNecessary();
                         }
+
+                    }
+                    finally
+                    {
+                        reference.release();
+                        if (finallyAction != null)
+                        {
+                            finallyAction.run();
+                        }
                     }
+
                 }
                 finally
                 {
-                    reference.release();
-                    if(finallyAction != null)
-                    {
-                        finallyAction.run();
-                    }
+                    _connection.registerMessageReceived(bodySize, timestamp);
+                    _currentMessage = null;
                 }
-
             }
-            finally
+            catch (AccessControlException e)
             {
-                _connection.registerMessageReceived(bodySize, timestamp);
-                _currentMessage = null;
+                _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(),
getChannelId());
             }
+
         }
 
     }
@@ -1372,9 +1373,7 @@ public class AMQChannel
 
     private boolean checkMessageUserId(ContentHeaderBody header)
     {
-        AMQShortString userID = header.getProperties().getUserId();
-        return (!_messageAuthorizationRequired || _connection.getAuthorizedPrincipal().getName().equals(userID
== null? "" : userID.toString()));
-
+        return _connection.isAuthorizedMessagePrincipal(AMQShortString.toString(header.getProperties().getUserId()));
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
Wed Jan 20 11:04:43 2016
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -114,7 +115,6 @@ public class AMQChannelTest extends Qpid
         when(_amqConnection.getProtocolOutputConverter()).thenReturn(_protocolOutputConverter);
         when(_amqConnection.getBroker()).thenReturn((Broker) _broker);
         when(_amqConnection.getMethodRegistry()).thenReturn(new MethodRegistry(ProtocolVersion.v0_9));
-
         _messageDestination = mock(MessageDestination.class);
     }
 
@@ -168,7 +168,7 @@ public class AMQChannelTest extends Qpid
     public void testPublishContentHeaderWhenMessageAuthorizationFails() throws Exception
     {
         when(_virtualHost.getDefaultDestination()).thenReturn(mock(MessageDestination.class));
-        when(_virtualHost.getContextValue(Boolean.class, Broker.BROKER_MSG_AUTH)).thenReturn(true);
+        when(_virtualHost.getContextValue(eq(Boolean.class), eq(Broker.BROKER_MSG_AUTH))).thenReturn(true);
         when(_virtualHost.getMessageStore()).thenReturn(new NullMessageStore()
         {
             @Override
@@ -184,6 +184,7 @@ public class AMQChannelTest extends Qpid
                                                         authenticatedUser,
                                                         Collections.<Principal>emptySet(),
                                                         Collections.<Principal>emptySet()));
+        _amqConnection.virtualHostAssociated();
 
         int channelId = 1;
         AMQChannel channel = new AMQChannel(_amqConnection, channelId, _virtualHost.getMessageStore());
@@ -193,12 +194,7 @@ public class AMQChannelTest extends Qpid
         channel.receiveBasicPublish(AMQShortString.EMPTY_STRING, AMQShortString.EMPTY_STRING,
false, false);
         channel.receiveMessageHeader(properties, 0);
 
-        verify(_protocolOutputConverter).writeReturn(any(MessagePublishInfo.class),
-                                                     any(ContentHeaderBody.class),
-                                                     any(MessageContentSource.class),
-                                                     eq(channelId),
-                                                     eq(AMQConstant.ACCESS_REFUSED.getCode()),
-                                                     eq(AMQShortString.valueOf("Access Refused")));
+        verify(_amqConnection).sendConnectionClose(eq(AMQConstant.ACCESS_REFUSED), anyString(),
eq(channelId));
         verifyZeroInteractions(_messageDestination);
     }
 
@@ -218,6 +214,7 @@ public class AMQChannelTest extends Qpid
 
         Set<Principal> authenticatedUser = Collections.<Principal>singleton(new
AuthenticatedPrincipal("user"));
         _amqConnection.setAuthorizedSubject(new Subject(true, authenticatedUser, Collections.<Principal>emptySet(),
 Collections.<Principal>emptySet()));
+        _amqConnection.virtualHostAssociated();
 
         AMQChannel channel = new AMQChannel(_amqConnection, 1, _virtualHost.getMessageStore());
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
Wed Jan 20 11:04:43 2016
@@ -34,17 +34,19 @@ public class ExchangeDestination impleme
     private static final Accepted ACCEPTED = new Accepted();
     public static final Rejected REJECTED = new Rejected();
     private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
+    private final String _address;
 
     private Exchange<?> _exchange;
     private TerminusDurability _durability;
     private TerminusExpiryPolicy _expiryPolicy;
     private String _initialRoutingAddress;
 
-    public ExchangeDestination(Exchange<?> exchange, TerminusDurability durable, TerminusExpiryPolicy
expiryPolicy)
+    public ExchangeDestination(Exchange<?> exchange, TerminusDurability durable, TerminusExpiryPolicy
expiryPolicy, String address)
     {
         _exchange = exchange;
         _durability = durable;
         _expiryPolicy = expiryPolicy;
+        _address = address;
     }
 
     public Outcome[] getOutcomes()
@@ -77,6 +79,25 @@ public class ExchangeDestination impleme
                     return null;
                 }};
 
+        int enqueues = _exchange.send(message,
+                                      getRoutingAddress(message),
+                                      instanceProperties,
+                                      txn,
+                                      null);
+
+
+        return enqueues == 0 ? REJECTED : ACCEPTED;
+    }
+
+    @Override
+    public String getAddress()
+    {
+        return _address;
+    }
+
+    @Override
+    public String getRoutingAddress(final Message_1_0 message)
+    {
         String routingAddress;
         MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
         if(_initialRoutingAddress == null)
@@ -115,14 +136,7 @@ public class ExchangeDestination impleme
                 routingAddress = _initialRoutingAddress;
             }
         }
-        int enqueues = _exchange.send(message,
-                                      routingAddress,
-                                      instanceProperties,
-                                      txn,
-                                      null);
-
-
-        return enqueues == 0 ? REJECTED : ACCEPTED;
+        return routingAddress;
     }
 
     TerminusDurability getDurability()

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
Wed Jan 20 11:04:43 2016
@@ -38,12 +38,17 @@ public class NodeReceivingDestination im
     private MessageDestination _destination;
     private TerminusDurability _durability;
     private TerminusExpiryPolicy _expiryPolicy;
+    private final String _address;
 
-    public NodeReceivingDestination(MessageDestination destination, TerminusDurability durable,
TerminusExpiryPolicy expiryPolicy)
+    public NodeReceivingDestination(MessageDestination destination,
+                                    TerminusDurability durable,
+                                    TerminusExpiryPolicy expiryPolicy,
+                                    final String address)
     {
         _destination = destination;
         _durability = durable;
         _expiryPolicy = expiryPolicy;
+        _address = address;
     }
 
     public Outcome[] getOutcomes()
@@ -77,8 +82,25 @@ public class NodeReceivingDestination im
                 }};
 
         String routingAddress;
+        routingAddress = getRoutingAddress(message);
+
+        int enqueues = _destination.send(message, routingAddress, instanceProperties, txn,
null);
+
+
+        return enqueues == 0 ? REJECTED : ACCEPTED;
+    }
+
+    @Override
+    public String getAddress()
+    {
+        return _address;
+    }
+
+    @Override
+    public String getRoutingAddress(final Message_1_0 message)
+    {
         MessageMetaData_1_0.MessageHeader_1_0 messageHeader = message.getMessageHeader();
-        routingAddress = messageHeader.getSubject();
+        String routingAddress = messageHeader.getSubject();
         if(routingAddress == null)
         {
             if (messageHeader.getHeader("routing-key") instanceof String)
@@ -99,11 +121,7 @@ public class NodeReceivingDestination im
                 routingAddress = "";
             }
         }
-
-        int enqueues = _destination.send(message, routingAddress, instanceProperties, txn,
null);
-
-
-        return enqueues == 0 ? REJECTED : ACCEPTED;
+        return routingAddress;
     }
 
     TerminusDurability getDurability()

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
Wed Jan 20 11:04:43 2016
@@ -31,11 +31,13 @@ public class QueueDestination extends Me
 {
     private static final Accepted ACCEPTED = new Accepted();
     private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
+    private final String _address;
 
 
-    public QueueDestination(Queue<?> queue)
+    public QueueDestination(Queue<?> queue, final String address)
     {
         super(queue);
+        _address = address;
     }
 
     public Outcome[] getOutcomes()
@@ -84,4 +86,15 @@ public class QueueDestination extends Me
         return (Queue<?>) super.getQueue();
     }
 
+    @Override
+    public String getRoutingAddress(Message_1_0 message)
+    {
+        return "";
+    }
+
+    @Override
+    public String getAddress()
+    {
+        return _address;
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
Wed Jan 20 11:04:43 2016
@@ -32,4 +32,8 @@ public interface ReceivingDestination ex
     Outcome send(Message_1_0 message, ServerTransaction txn);
 
     int getCredit();
+
+    String getRoutingAddress(Message_1_0 message);
+
+    String getAddress();
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
Wed Jan 20 11:04:43 2016
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,12 +38,16 @@ import org.apache.qpid.amqp_1_0.type.Uns
 import org.apache.qpid.amqp_1_0.type.messaging.Target;
 import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
 import org.apache.qpid.amqp_1_0.type.transport.Detach;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -171,74 +176,102 @@ public class ReceivingLink_1_0 implement
             fragments = null;
 
             MessageReference<Message_1_0> reference = message.newReference();
-
-            Binary transactionId = null;
-            if(xfrState != null)
+            try
             {
-                if(xfrState instanceof TransactionalState)
+                Binary transactionId = null;
+                if (xfrState != null)
                 {
-                    transactionId = ((TransactionalState)xfrState).getTxnId();
+                    if (xfrState instanceof TransactionalState)
+                    {
+                        transactionId = ((TransactionalState) xfrState).getTxnId();
+                    }
                 }
-            }
-
-            ServerTransaction transaction = null;
-            if(transactionId != null)
-            {
-                transaction = getSession().getTransaction(transactionId);
-            }
-            else
-            {
-                Session_1_0 session = getSession();
-                transaction = session != null ? session.getTransaction(null) : new AutoCommitTransaction(_vhost.getMessageStore());
-            }
 
-            Outcome outcome = _destination.send(message, transaction);
-
-            DeliveryState resultantState;
+                ServerTransaction transaction = null;
+                if (transactionId != null)
+                {
+                    transaction = getSession().getTransaction(transactionId);
+                }
+                else
+                {
+                    Session_1_0 session = getSession();
+                    transaction = session != null
+                            ? session.getTransaction(null)
+                            : new AutoCommitTransaction(_vhost.getMessageStore());
+                }
 
-            if(transactionId == null)
-            {
-                resultantState = (DeliveryState) outcome;
-            }
-            else
-            {
-                TransactionalState transactionalState = new TransactionalState();
-                transactionalState.setOutcome(outcome);
-                transactionalState.setTxnId(transactionId);
-                resultantState = transactionalState;
+                final SecurityManager securityManager = _vhost.getSecurityManager();
+                try
+                {
+                    securityManager.authorisePublish(false,
+                                                     _destination.getRoutingAddress(message),
+                                                     _destination.getAddress(),
+                                                     _vhost.getName(),
+                                                     _attachment.getSession().getSubject(),
+                                                     message.getMessageHeader().getUserId(),
+                                                     _attachment.getSession().getAMQPConnection());
 
-            }
+                    Outcome outcome = _destination.send(message, transaction);
 
+                    DeliveryState resultantState;
 
-            boolean settled = transaction instanceof AutoCommitTransaction && ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
+                    if (transactionId == null)
+                    {
+                        resultantState = (DeliveryState) outcome;
+                    }
+                    else
+                    {
+                        TransactionalState transactionalState = new TransactionalState();
+                        transactionalState.setOutcome(outcome);
+                        transactionalState.setTxnId(transactionId);
+                        resultantState = transactionalState;
 
-            if(!settled)
-            {
-                _unsettledMap.put(deliveryTag, outcome);
-            }
+                    }
 
-            getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
 
-            getSession().getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
+                    boolean settled = transaction instanceof AutoCommitTransaction &&
ReceiverSettleMode.FIRST.equals(
+                            getReceivingSettlementMode()                                
                            );
 
-            if(!(transaction instanceof AutoCommitTransaction))
-            {
-                ServerTransaction.Action a;
-                transaction.addPostTransactionAction(new ServerTransaction.Action()
-                {
-                    public void postCommit()
+                    if (!settled)
                     {
-                        getEndpoint().updateDisposition(deliveryTag, null, true);
+                        _unsettledMap.put(deliveryTag, outcome);
                     }
 
-                    public void onRollback()
+                    getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
+
+                    getSession().getAMQPConnection()
+                            .registerMessageReceived(message.getSize(), message.getArrivalTime());
+
+                    if (!(transaction instanceof AutoCommitTransaction))
                     {
-                        getEndpoint().updateDisposition(deliveryTag, null, true);
+                        ServerTransaction.Action a;
+                        transaction.addPostTransactionAction(new ServerTransaction.Action()
+                        {
+                            public void postCommit()
+                            {
+                                getEndpoint().updateDisposition(deliveryTag, null, true);
+                            }
+
+                            public void onRollback()
+                            {
+                                getEndpoint().updateDisposition(deliveryTag, null, true);
+                            }
+                        });
                     }
-                });
-            }
+                }
+                catch (AccessControlException e)
+                {
+                    final Error err = new Error();
+                    err.setCondition(AmqpError.NOT_ALLOWED);
+                    err.setDescription(e.getMessage());
+                    _attachment.getEndpoint().close(err);
 
-            reference.release();
+                }
+            }
+            finally
+            {
+                reference.release();
+            }
         }
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
Wed Jan 20 11:04:43 2016
@@ -323,7 +323,7 @@ public class SendingLink_1_0 implements
 
                 source.setDistributionMode(StdDistMode.COPY);
 
-                qd = new QueueDestination(queue);
+                qd = new QueueDestination(queue, name);
             }
             catch (QueueExistsException e)
             {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1725700&r1=1725699&r2=1725700&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Wed Jan 20 11:04:43 2016
@@ -176,7 +176,7 @@ public class Session_1_0 implements Sess
                         if(exchg != null)
                         {
                             ExchangeDestination exchangeDestination =
-                                    new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
+                                    new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy(),
parts[0]);
                             exchangeDestination.setInitialRoutingAddress(parts[1]);
                             destination = exchangeDestination;
 
@@ -199,7 +199,7 @@ public class Session_1_0 implements Sess
                             Exchange<?> exchg = getVirtualHost().getAttainedChildFromAddress(Exchange.class,
addr);
                             if(exchg != null)
                             {
-                                destination = new ExchangeDestination(exchg, source.getDurable(),
source.getExpiryPolicy());
+                                destination = new ExchangeDestination(exchg, source.getDurable(),
source.getExpiryPolicy(), addr);
                             }
                             else
                             {
@@ -335,7 +335,7 @@ public class Session_1_0 implements Sess
                         {
                             MessageDestination messageDestination = getVirtualHost().getDefaultDestination();
                             destination = new NodeReceivingDestination(messageDestination,
target.getDurable(),
-                                                                       target.getExpiryPolicy());
+                                                                       target.getExpiryPolicy(),
"");
                         }
                         else if(!addr.startsWith("/") && addr.contains("/"))
                         {
@@ -346,7 +346,8 @@ public class Session_1_0 implements Sess
                                 ExchangeDestination exchangeDestination =
                                         new ExchangeDestination(exchange,
                                                                 target.getDurable(),
-                                                                target.getExpiryPolicy());
+                                                                target.getExpiryPolicy(),
+                                                                parts[0]);
 
                                 exchangeDestination.setInitialRoutingAddress(parts[1]);
 
@@ -365,7 +366,7 @@ public class Session_1_0 implements Sess
                             if(messageDestination != null)
                             {
                                 destination = new NodeReceivingDestination(messageDestination,
target.getDurable(),
-                                                                           target.getExpiryPolicy());
+                                                                           target.getExpiryPolicy(),
addr);
                             }
                             else
                             {
@@ -373,7 +374,7 @@ public class Session_1_0 implements Sess
                                 if(queue != null)
                                 {
 
-                                    destination = new QueueDestination(queue);
+                                    destination = new QueueDestination(queue, addr);
                                 }
                                 else
                                 {




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message