activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [4/6] activemq-artemis git commit: ARTEMIS-1118 IO callbacks on AMQP
Date Tue, 18 Apr 2017 17:06:34 GMT
ARTEMIS-1118 IO callbacks on AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/31d78edd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/31d78edd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/31d78edd

Branch: refs/heads/master
Commit: 31d78eddf1e74b5b846442d0738ccbf1aa42e7d3
Parents: 807e4e5
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Mon Apr 17 23:21:43 2017 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Apr 18 11:49:25 2017 -0400

----------------------------------------------------------------------
 .../activemq/artemis/utils/RunnableEx.java      |  22 +++
 .../amqp/broker/AMQPSessionCallback.java        | 168 +++++++++++--------
 .../client/AMQPClientConnectionFactory.java     |   2 +-
 .../amqp/proton/ProtonServerSenderContext.java  |  18 +-
 .../transaction/ProtonTransactionHandler.java   |  62 ++++---
 5 files changed, 181 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31d78edd/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RunnableEx.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RunnableEx.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RunnableEx.java
new file mode 100644
index 0000000..426cfa2
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RunnableEx.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+public interface RunnableEx {
+   void run() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31d78edd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 9e54d41..08ea959 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -53,6 +54,7 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.RunnableEx;
 import org.apache.activemq.artemis.utils.SelectorTranslator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -78,6 +80,8 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private final ProtonProtocolManager manager;
 
+   private final StorageManager storageManager;
+
    private final AMQPConnectionContext connection;
 
    private final Connection transportConnection;
@@ -100,6 +104,7 @@ public class AMQPSessionCallback implements SessionCallback {
                               OperationContext operationContext) {
       this.protonSPI = protonSPI;
       this.manager = manager;
+      this.storageManager = manager.getServer().getStorageManager();
       this.connection = connection;
       this.transportConnection = transportConnection;
       this.closeExecutor = executor;
@@ -134,6 +139,24 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
+   public void withinContext(RunnableEx run) throws Exception {
+      OperationContext context = recoverContext();
+      try {
+         run.run();
+      } finally {
+         resetContext(context);
+      }
+   }
+
+   public void afterIO(IOCallback ioCallback) {
+      OperationContext context = recoverContext();
+      try {
+         manager.getServer().getStorageManager().afterCompleteOperations(ioCallback);
+      } finally {
+         resetContext(context);
+      }
+   }
+
    @Override
    public void browserFinished(ServerConsumer consumer) {
 
@@ -315,11 +338,11 @@ public class AMQPSessionCallback implements SessionCallback {
    public void close() throws Exception {
       //need to check here as this can be called if init fails
       if (serverSession != null) {
-         recoverContext();
+         OperationContext context = recoverContext();
          try {
             serverSession.close(false);
          } finally {
-            resetContext();
+            resetContext(context);
          }
       }
    }
@@ -328,30 +351,30 @@ public class AMQPSessionCallback implements SessionCallback {
       if (transaction == null) {
          transaction = serverSession.getCurrentTransaction();
       }
-      recoverContext();
+      OperationContext oldContext = recoverContext();
       try {
          ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, message.getMessageID());
       } finally {
-         resetContext();
+         resetContext(oldContext);
       }
    }
 
    public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws
Exception {
-      recoverContext();
+      OperationContext oldContext = recoverContext();
       try {
          ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
          ((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
       } finally {
-         resetContext();
+         resetContext(oldContext);
       }
    }
 
    public void reject(Object brokerConsumer, Message message) throws Exception {
-      recoverContext();
+      OperationContext oldContext = recoverContext();
       try {
          ((ServerConsumer) brokerConsumer).reject(message.getMessageID());
       } finally {
-         resetContext();
+         resetContext(oldContext);
       }
    }
 
@@ -380,22 +403,26 @@ public class AMQPSessionCallback implements SessionCallback {
          }
       }
 
-      recoverContext();
+      OperationContext oldcontext = recoverContext();
 
-      PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
-      if (store.isRejectingMessages()) {
-         // We drop pre-settled messages (and abort any associated Tx)
-         if (delivery.remotelySettled()) {
-            if (transaction != null) {
-               String amqpAddress = delivery.getLink().getTarget().getAddress();
-               ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address
is full: " + amqpAddress);
-               transaction.markAsRollbackOnly(e);
+      try {
+         PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
+         if (store.isRejectingMessages()) {
+            // We drop pre-settled messages (and abort any associated Tx)
+            if (delivery.remotelySettled()) {
+               if (transaction != null) {
+                  String amqpAddress = delivery.getLink().getTarget().getAddress();
+                  ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address
is full: " + amqpAddress);
+                  transaction.markAsRollbackOnly(e);
+               }
+            } else {
+               rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full:
" + address);
             }
          } else {
-            rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full:
" + address);
+            serverSend(transaction, message, delivery, receiver);
          }
-      } else {
-         serverSend(transaction, message, delivery, receiver);
+      } finally {
+         resetContext(oldcontext);
       }
    }
 
@@ -406,61 +433,67 @@ public class AMQPSessionCallback implements SessionCallback {
       Rejected rejected = new Rejected();
       rejected.setError(condition);
 
-      connection.lock();
-      try {
-         delivery.disposition(rejected);
-         delivery.settle();
-      } finally {
-         connection.unlock();
-      }
-      connection.flush();
+      afterIO(new IOCallback() {
+         @Override
+         public void done() {
+            connection.lock();
+            try {
+               delivery.disposition(rejected);
+               delivery.settle();
+            } finally {
+               connection.unlock();
+            }
+            connection.flush();
+         }
+
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+
+         }
+      });
+
    }
 
    private void serverSend(final Transaction transaction,
                            final Message message,
                            final Delivery delivery,
                            final Receiver receiver) throws Exception {
-      try {
-
-         message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
+      message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
 
-         serverSession.send(transaction, message, false, false);
+      serverSession.send(transaction, message, false, false);
 
-         manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback()
{
-            @Override
-            public void done() {
-               connection.lock();
-               try {
-                  if (delivery.getRemoteState() instanceof TransactionalState) {
-                     TransactionalState txAccepted = new TransactionalState();
-                     txAccepted.setOutcome(Accepted.getInstance());
-                     txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId());
-
-                     delivery.disposition(txAccepted);
-                  } else {
-                     delivery.disposition(Accepted.getInstance());
-                  }
-                  delivery.settle();
-               } finally {
-                  connection.unlock();
+      afterIO(new IOCallback() {
+         @Override
+         public void done() {
+            connection.lock();
+            try {
+               if (delivery.getRemoteState() instanceof TransactionalState) {
+                  TransactionalState txAccepted = new TransactionalState();
+                  txAccepted.setOutcome(Accepted.getInstance());
+                  txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId());
+
+                  delivery.disposition(txAccepted);
+               } else {
+                  delivery.disposition(Accepted.getInstance());
                }
-               connection.flush();
+               delivery.settle();
+            } finally {
+               connection.unlock();
             }
+            connection.flush();
+         }
 
-            @Override
-            public void onError(int errorCode, String errorMessage) {
-               connection.lock();
-               try {
-                  receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode
+ ":" + errorMessage));
-                  connection.flush();
-               } finally {
-                  connection.unlock();
-               }
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+            connection.lock();
+            try {
+               receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode
+ ":" + errorMessage));
+               connection.flush();
+            } finally {
+               connection.unlock();
             }
-         });
-      } finally {
-         resetContext();
-      }
+         }
+      });
    }
 
    public void offerProducerCredit(final String address,
@@ -502,12 +535,15 @@ public class AMQPSessionCallback implements SessionCallback {
       manager.getServer().destroyQueue(new SimpleString(queueName));
    }
 
-   private void resetContext() {
-      manager.getServer().getStorageManager().setContext(null);
+   public void resetContext(OperationContext oldContext) {
+      storageManager.setContext(oldContext);
    }
 
-   private void recoverContext() {
+   public OperationContext recoverContext() {
+
+      OperationContext oldContext = storageManager.getContext();
       manager.getServer().getStorageManager().setContext(serverSession.getSessionContext());
+      return oldContext;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31d78edd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index 441f3a6..6aa8fda 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory {
 
       Executor executor = server.getExecutorFactory().getExecutor();
 
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback,
containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX,
server.getScheduledPool());
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback,
containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX,
server.getScheduledPool());
       eventHandler.ifPresent(amqpConnection::addEventHandler);
 
       ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager,
amqpConnection, connection, executor);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31d78edd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index ccc93b7..4d8bf53 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -29,6 +29,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.Consumer;
@@ -486,6 +488,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
          return;
       }
 
+      OperationContext oldContext = sessionSPI.recoverContext();
+
       try {
          Message message = ((MessageReference) delivery.getContext()).getMessage();
 
@@ -590,7 +594,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
             // todo not sure if we need to do anything here
          }
       } finally {
-         connection.flush();
+         sessionSPI.afterIO(new IOCallback() {
+            @Override
+            public void done() {
+               connection.flush();
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+               connection.flush();
+            }
+         });
+
+         sessionSPI.resetContext(oldContext);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31d78edd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 4579f1c..bf2e575 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
 
 import java.nio.ByteBuffer;
 
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@@ -118,24 +119,29 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler
{
             ProtonTransactionImpl tx = (ProtonTransactionImpl) sessionSPI.getTransaction(txID,
true);
             tx.discharge();
 
-            if (discharge.getFail()) {
-               tx.rollback();
-               connection.lock();
-               try {
-                  delivery.disposition(new Accepted());
-               } finally {
-                  connection.unlock();
+            IOCallback ioAction = new IOCallback() {
+               @Override
+               public void done() {
+                  connection.lock();
+                  try {
+                     delivery.disposition(new Accepted());
+                  } finally {
+                     connection.unlock();
+                  }
                }
-               connection.flush();
-            } else {
-               tx.commit();
-               connection.lock();
-               try {
-                  delivery.disposition(new Accepted());
-               } finally {
-                  connection.unlock();
+
+               @Override
+               public void onError(int errorCode, String errorMessage) {
+
                }
-               connection.flush();
+            };
+
+            if (discharge.getFail()) {
+               sessionSPI.withinContext(() -> tx.rollback());
+               sessionSPI.afterIO(ioAction);
+            } else {
+               sessionSPI.withinContext(() -> tx.commit());
+               sessionSPI.afterIO(ioAction);
             }
          }
       } catch (ActiveMQAMQPException amqpE) {
@@ -157,13 +163,23 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler
{
          }
          connection.flush();
       } finally {
-         connection.lock();
-         try {
-            delivery.settle();
-         } finally {
-            connection.unlock();
-         }
-         connection.flush();
+         sessionSPI.afterIO(new IOCallback() {
+            @Override
+            public void done() {
+               connection.lock();
+               try {
+                  delivery.settle();
+               } finally {
+                  connection.unlock();
+               }
+               connection.flush();
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+
+            }
+         });
       }
    }
 


Mime
View raw message