activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/8] activemq-artemis git commit: ARTEMIS-1512 Fix race condition with Subscribe receipt
Date Mon, 13 Nov 2017 22:02:41 GMT
ARTEMIS-1512 Fix race condition with Subscribe receipt


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/120fc190
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/120fc190
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/120fc190

Branch: refs/heads/master
Commit: 120fc190c6520b7a093cbc802688c31ab54bf136
Parents: a5c443a
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Fri Nov 10 12:35:46 2017 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Nov 13 16:55:47 2017 -0500

----------------------------------------------------------------------
 .../core/protocol/stomp/StompConnection.java    | 14 ++---
 .../stomp/StompPostReceiptFunction.java         | 21 ++++++++
 .../protocol/stomp/StompProtocolManager.java    | 16 ++++--
 .../core/protocol/stomp/StompSession.java       | 19 +++----
 .../stomp/VersionedStompFrameHandler.java       | 56 +++++++++++---------
 5 files changed, 77 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/120fc190/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 13b7b86..96859bc 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -343,7 +343,7 @@ public final class StompConnection implements RemotingConnection {
 
          StompFrame frame = frameHandler.createStompFrame(Stomp.Responses.ERROR);
          frame.addHeader(Stomp.Headers.Error.MESSAGE, me.getMessage());
-         sendFrame(frame);
+         sendFrame(frame, null);
 
          destroyed = true;
       }
@@ -552,7 +552,7 @@ public final class StompConnection implements RemotingConnection {
       }
 
       if (reply != null) {
-         sendFrame(reply);
+         sendFrame(reply, null);
       }
 
       if (Stomp.Commands.DISCONNECT.equals(cmd)) {
@@ -560,8 +560,8 @@ public final class StompConnection implements RemotingConnection {
       }
    }
 
-   public void sendFrame(StompFrame frame) {
-      manager.sendReply(this, frame);
+   public void sendFrame(StompFrame frame, StompPostReceiptFunction function) {
+      manager.sendReply(this, frame, function);
    }
 
    public boolean validateUser(final String login, final String pass, final RemotingConnection
connection) {
@@ -660,7 +660,7 @@ public final class StompConnection implements RemotingConnection {
       }
    }
 
-   void subscribe(String destination,
+   StompPostReceiptFunction subscribe(String destination,
                   String selector,
                   String ack,
                   String id,
@@ -694,7 +694,7 @@ public final class StompConnection implements RemotingConnection {
       }
 
       try {
-         manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector,
ack, noLocal);
+         return manager.subscribe(this, subscriptionID, durableSubscriptionName, destination,
selector, ack, noLocal);
       } catch (ActiveMQStompException e) {
          throw e;
       } catch (Exception e) {
@@ -743,7 +743,7 @@ public final class StompConnection implements RemotingConnection {
 
    //send a ping stomp frame
    public void ping(StompFrame pingFrame) {
-      manager.sendReply(this, pingFrame);
+      manager.sendReply(this, pingFrame, null);
    }
 
    public void physicalSend(StompFrame frame) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/120fc190/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompPostReceiptFunction.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompPostReceiptFunction.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompPostReceiptFunction.java
new file mode 100644
index 0000000..381b0f0
--- /dev/null
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompPostReceiptFunction.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.stomp;
+
+public interface StompPostReceiptFunction {
+   void afterReceipt();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/120fc190/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 84c78c2..888674c 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -33,9 +33,9 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.remoting.CertificateUtil;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.remoting.CertificateUtil;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.ServerSession;
@@ -281,7 +281,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,
St
       });
    }
 
-   public void sendReply(final StompConnection connection, final StompFrame frame) {
+   public void sendReply(final StompConnection connection, final StompFrame frame, final
StompPostReceiptFunction function) {
       server.getStorageManager().afterCompleteOperations(new IOCallback() {
          @Override
          public void onError(final int errorCode, final String errorMessage) {
@@ -295,7 +295,13 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,
St
 
          @Override
          public void done() {
-            send(connection, frame);
+            if (frame != null) {
+               send(connection, frame);
+            }
+
+            if (function != null) {
+               function.afterReceipt();
+            }
          }
       });
    }
@@ -361,7 +367,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,
St
    }
    // Inner classes -------------------------------------------------
 
-   public void subscribe(StompConnection connection,
+   public StompPostReceiptFunction subscribe(StompConnection connection,
                          String subscriptionID,
                          String durableSubscriptionName,
                          String destination,
@@ -375,7 +381,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,
St
             ". Either use unique subscription IDs or do not create multiple subscriptions
for the same destination");
       }
       long consumerID = server.getStorageManager().generateID();
-      stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(),
durableSubscriptionName, destination, selector, ack);
+      return stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(),
durableSubscriptionName, destination, selector, ack);
    }
 
    public void unsubscribe(StompConnection connection,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/120fc190/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 03b5757..33f5c7a 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -242,7 +242,7 @@ public class StompSession implements SessionCallback {
          StompFrame frame = connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR);
          frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
          frame.setBody("consumer with ID " + consumerId + " disconnected by server");
-         connection.sendFrame(frame);
+         connection.sendFrame(frame, null);
       }
    }
 
@@ -278,7 +278,7 @@ public class StompSession implements SessionCallback {
       session.commit();
    }
 
-   public void addSubscription(long consumerID,
+   public StompPostReceiptFunction addSubscription(long consumerID,
                                String subscriptionID,
                                String clientID,
                                String durableSubscriptionName,
@@ -287,13 +287,11 @@ public class StompSession implements SessionCallback {
                                String ack) throws Exception {
       SimpleString queueName = SimpleString.toSimpleString(destination);
       boolean pubSub = false;
-      int receiveCredits = consumerCredits;
-      if (ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
-         receiveCredits = -1;
-      }
+      final int receiveCredits = ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ?
-1 : consumerCredits;
 
       Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes();
-      if (routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST))
{
+      boolean topic = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
+      if (topic) {
          // subscribes to a topic
          pubSub = true;
          if (durableSubscriptionName != null) {
@@ -308,15 +306,12 @@ public class StompSession implements SessionCallback {
             queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
             session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector),
true, false);
          }
-         session.createConsumer(consumerID, queueName, null, false, false, receiveCredits);
-      } else {
-         session.createConsumer(consumerID, queueName, SimpleString.toSimpleString(selector),
false, false, receiveCredits);
       }
-
+      final ServerConsumer consumer = topic ? session.createConsumer(consumerID, queueName,
null, false, false, 0) : session.createConsumer(consumerID, queueName, SimpleString.toSimpleString(selector),
false, false, 0);
       StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName,
pubSub);
       subscriptions.put(consumerID, subscription);
-
       session.start();
+      return () -> consumer.receiveCredits(receiveCredits);
    }
 
    public boolean unsubscribe(String id, String durableSubscriptionName, String clientID)
throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/120fc190/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index df6d9b0..bdae6fc 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -96,7 +96,7 @@ public abstract class VersionedStompFrameHandler {
       } else if (Stomp.Commands.ABORT.equals(request.getCommand())) {
          response = onAbort(request);
       } else if (Stomp.Commands.SUBSCRIBE.equals(request.getCommand())) {
-         response = onSubscribe(request);
+         return handleSubscribe(request);
       } else if (Stomp.Commands.UNSUBSCRIBE.equals(request.getCommand())) {
          response = onUnsubscribe(request);
       } else if (Stomp.Commands.CONNECT.equals(request.getCommand())) {
@@ -120,6 +120,21 @@ public abstract class VersionedStompFrameHandler {
       return response;
    }
 
+   private StompFrame handleSubscribe(StompFrame request) {
+      StompFrame response = null;
+      try {
+         StompPostReceiptFunction postProcessFunction = onSubscribe(request);
+         response = postprocess(request);
+         if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED)) {
+            response.addHeader(Stomp.Headers.Response.RECEIPT_ID, request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+         }
+         connection.sendFrame(response, postProcessFunction);
+         return null;
+      } catch (ActiveMQStompException e) {
+         return e.getFrame();
+      }
+
+   }
    public abstract StompFrame onConnect(StompFrame frame);
 
    public abstract StompFrame onDisconnect(StompFrame frame);
@@ -240,31 +255,22 @@ public abstract class VersionedStompFrameHandler {
       return response;
    }
 
-   public StompFrame onSubscribe(StompFrame frame) {
-      StompFrame response = null;
-      try {
-         String destination = getDestination(frame);
-
-         String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR);
-         String ack = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
-         String id = frame.getHeader(Stomp.Headers.Subscribe.ID);
-         String durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
-         if (durableSubscriptionName == null) {
-            durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
-         }
-         RoutingType routingType = getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE),
frame.getHeader(Headers.Subscribe.DESTINATION));
-         boolean noLocal = false;
-
-         if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
-            noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
-         }
+   public StompPostReceiptFunction onSubscribe(StompFrame frame) throws ActiveMQStompException
{
+      String destination = getDestination(frame);
 
-         connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal,
routingType);
-      } catch (ActiveMQStompException e) {
-         response = e.getFrame();
+      String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR);
+      String ack = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
+      String id = frame.getHeader(Stomp.Headers.Subscribe.ID);
+      String durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
+      if (durableSubscriptionName == null) {
+         durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
       }
-
-      return response;
+      RoutingType routingType = getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE),
frame.getHeader(Headers.Subscribe.DESTINATION));
+      boolean noLocal = false;
+      if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
+         noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
+      }
+      return connection.subscribe(destination, selector, ack, id, durableSubscriptionName,
noLocal, routingType);
    }
 
    public String getDestination(StompFrame request) {
@@ -334,7 +340,7 @@ public abstract class VersionedStompFrameHandler {
 
    //sends an ERROR frame back to client if possible then close the connection
    public void onError(ActiveMQStompException e) {
-      this.connection.sendFrame(e.getFrame());
+      this.connection.sendFrame(e.getFrame(), null);
       connection.destroy();
    }
 


Mime
View raw message