activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-1051 Make ServerSession send thread safe
Date Tue, 21 Mar 2017 14:27:51 GMT
ARTEMIS-1051 Make ServerSession send thread safe


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8394fec1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8394fec1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8394fec1

Branch: refs/heads/master
Commit: 8394fec104e015c512cb5b323009e535707b4f91
Parents: bfa679c
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Mon Mar 20 17:16:10 2017 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Mar 21 09:14:24 2017 -0400

----------------------------------------------------------------------
 .../core/server/impl/ServerSessionImpl.java     | 157 ++++++++++---------
 1 file changed, 82 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8394fec1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 08464e6..af1c532 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -182,6 +182,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
{
 
    private Set<Closeable> closeables;
 
+   private final Object sendLock = new Object();
+
    public ServerSessionImpl(final String name,
                             final String username,
                             final String password,
@@ -1290,54 +1292,56 @@ public class ServerSessionImpl implements ServerSession, FailureListener
{
                              final boolean direct,
                              boolean noAutoCreateQueue) throws Exception {
 
-      // If the protocol doesn't support flow control, we have no choice other than fail
the communication
-      if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull())
{
-         ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
-         this.getRemotingConnection().fail(exception);
-         throw exception;
-      }
+      synchronized (sendLock) {
+         // If the protocol doesn't support flow control, we have no choice other than fail
the communication
+         if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull())
{
+            ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
+            this.getRemotingConnection().fail(exception);
+            throw exception;
+         }
 
-      RoutingStatus result = RoutingStatus.OK;
-      //large message may come from StompSession directly, in which
-      //case the id header already generated.
-      if (!message.isLargeMessage()) {
-         long id = storageManager.generateID();
-         // This will re-encode the message
-         message.setMessageID(id);
-      }
+         RoutingStatus result = RoutingStatus.OK;
+         //large message may come from StompSession directly, in which
+         //case the id header already generated.
+         if (!message.isLargeMessage()) {
+            long id = storageManager.generateID();
+            // This will re-encode the message
+            message.setMessageID(id);
+         }
 
-      if (server.getConfiguration().isPopulateValidatedUser() && validatedUser !=
null) {
-         message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser));
-      }
+         if (server.getConfiguration().isPopulateValidatedUser() && validatedUser
!= null) {
+            message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser));
+         }
 
-      SimpleString address = message.getAddressSimpleString();
+         SimpleString address = message.getAddressSimpleString();
 
-      if (defaultAddress == null && address != null) {
-         defaultAddress = address;
-      }
+         if (defaultAddress == null && address != null) {
+            defaultAddress = address;
+         }
 
-      if (address == null) {
-         // We don't want to force a re-encode when the message gets sent to the consumer
-         message.setAddress(defaultAddress);
-      }
+         if (address == null) {
+            // We don't want to force a re-encode when the message gets sent to the consumer
+            message.setAddress(defaultAddress);
+         }
 
-      if (logger.isTraceEnabled()) {
-         logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
-      }
+         if (logger.isTraceEnabled()) {
+            logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
+         }
 
-      if (message.getAddress() == null) {
-         // This could happen with some tests that are ignoring messages
-         throw ActiveMQMessageBundle.BUNDLE.noAddress();
-      }
+         if (message.getAddress() == null) {
+            // This could happen with some tests that are ignoring messages
+            throw ActiveMQMessageBundle.BUNDLE.noAddress();
+         }
 
-      if (message.getAddressSimpleString().equals(managementAddress)) {
-         // It's a management message
+         if (message.getAddressSimpleString().equals(managementAddress)) {
+            // It's a management message
 
-         handleManagementMessage(tx, message, direct);
-      } else {
-         result = doSend(tx, message, address, direct, noAutoCreateQueue);
+            handleManagementMessage(tx, message, direct);
+         } else {
+            result = doSend(tx, message, address, direct, noAutoCreateQueue);
+         }
+         return result;
       }
-      return result;
    }
 
 
@@ -1616,55 +1620,58 @@ public class ServerSessionImpl implements ServerSession, FailureListener
{
                                final SimpleString originalAddress,
                                final boolean direct,
                                final boolean noAutoCreateQueue) throws Exception {
-      RoutingStatus result = RoutingStatus.OK;
 
-      RoutingType routingType = msg.getRouteType();
+      synchronized (sendLock) {
+         RoutingStatus result = RoutingStatus.OK;
 
-      /* TODO-now: How to address here with AMQP?
-      if (originalAddress != null) {
-         if (originalAddress.toString().startsWith("anycast:")) {
-            routingType = RoutingType.ANYCAST;
-         } else if (originalAddress.toString().startsWith("multicast:")) {
-            routingType = RoutingType.MULTICAST;
-         }
-      } */
+         RoutingType routingType = msg.getRouteType();
 
-      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(),
routingType);
+         /* TODO-now: How to address here with AMQP?
+         if (originalAddress != null) {
+            if (originalAddress.toString().startsWith("anycast:")) {
+               routingType = RoutingType.ANYCAST;
+            } else if (originalAddress.toString().startsWith("multicast:")) {
+               routingType = RoutingType.MULTICAST;
+            }
+         } */
 
-      // Consumer
-      // check the user has write access to this address.
-      try {
-         securityCheck(art.getA(), CheckType.SEND, this);
-      } catch (ActiveMQException e) {
-         if (!autoCommitSends && tx != null) {
-            tx.markAsRollbackOnly(e);
+         Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(),
routingType);
+
+         // Consumer
+         // check the user has write access to this address.
+         try {
+            securityCheck(art.getA(), CheckType.SEND, this);
+         } catch (ActiveMQException e) {
+            if (!autoCommitSends && tx != null) {
+               tx.markAsRollbackOnly(e);
+            }
+            throw e;
          }
-         throw e;
-      }
 
-      if (tx == null || autoCommitSends) {
-      } else {
-         routingContext.setTransaction(tx);
-      }
+         if (tx == null || autoCommitSends) {
+         } else {
+            routingContext.setTransaction(tx);
+         }
 
-      try {
-         routingContext.setAddress(art.getA());
-         routingContext.setRoutingType(art.getB());
+         try {
+            routingContext.setAddress(art.getA());
+            routingContext.setRoutingType(art.getB());
 
-         result = postOffice.route(msg, routingContext, direct);
+            result = postOffice.route(msg, routingContext, direct);
 
-         Pair<Object, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString());
+            Pair<Object, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString());
 
-         if (value == null) {
-            targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(),
new AtomicLong(1)));
-         } else {
-            value.setA(msg.getUserID());
-            value.getB().incrementAndGet();
+            if (value == null) {
+               targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(),
new AtomicLong(1)));
+            } else {
+               value.setA(msg.getUserID());
+               value.getB().incrementAndGet();
+            }
+         } finally {
+            routingContext.clear();
          }
-      } finally {
-         routingContext.clear();
+         return result;
       }
-      return result;
    }
 
    @Override


Mime
View raw message