activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: Make BrokerService visible to the protocol converter so that in the future we can use it to better manage durable subscriptions and link reattach behavior.
Date Tue, 28 Oct 2014 18:58:31 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 135226533 -> adafdfe97


Make BrokerService visible to the protocol converter so that in the
future we can use it to better manage durable subscriptions and link
reattach behavior.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/adafdfe9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/adafdfe9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/adafdfe9

Branch: refs/heads/trunk
Commit: adafdfe97d9522d399ebe32f6c28fe31619b15a9
Parents: 1352265
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Oct 28 14:58:17 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Oct 28 14:58:17 2014 -0400

----------------------------------------------------------------------
 .../amqp/AMQPProtocolDiscriminator.java         | 19 ++++++++++------
 .../transport/amqp/AMQPSslTransportFactory.java |  7 +++---
 .../transport/amqp/AmqpNioTransportFactory.java |  7 +++---
 .../transport/amqp/AmqpProtocolConverter.java   | 24 ++++++++++++++++++--
 .../activemq/transport/amqp/AmqpSupport.java    |  1 +
 .../transport/amqp/AmqpTransportFactory.java    |  7 +++---
 .../transport/amqp/AmqpTransportFilter.java     |  6 ++---
 7 files changed, 47 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
index 09478d7..a7607af 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.Command;
 
 /**
@@ -29,14 +30,16 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter
{
 
     private static final int DEFAULT_PREFETCH = 100;
 
-    final private AmqpTransport transport;
+    private final AmqpTransport transport;
+    private final BrokerService brokerService;
+
     private int prefetch = DEFAULT_PREFETCH;
     private int producerCredit = DEFAULT_PREFETCH;
 
     interface Discriminator {
         boolean matches(AmqpHeader header);
 
-        IAmqpProtocolConverter create(AmqpTransport transport);
+        IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService);
     }
 
     static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
@@ -44,8 +47,8 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter
{
         DISCRIMINATORS.add(new Discriminator() {
 
             @Override
-            public IAmqpProtocolConverter create(AmqpTransport transport) {
-                return new AmqpProtocolConverter(transport);
+            public IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService)
{
+                return new AmqpProtocolConverter(transport, brokerService);
             }
 
             @Override
@@ -60,13 +63,13 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter
{
                 return false;
             }
         });
-
     }
 
     final private ArrayList<Command> pendingCommands = new ArrayList<Command>();
 
-    public AMQPProtocolDiscriminator(AmqpTransport transport) {
+    public AMQPProtocolDiscriminator(AmqpTransport transport, BrokerService brokerService)
{
         this.transport = transport;
+        this.brokerService = brokerService;
     }
 
     @Override
@@ -80,11 +83,13 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter
{
                     match = discriminator;
                 }
             }
+
             // Lets use first in the list if none are a good match.
             if (match == null) {
                 match = DISCRIMINATORS.get(0);
             }
-            IAmqpProtocolConverter next = match.create(transport);
+
+            IAmqpProtocolConverter next = match.create(transport, brokerService);
             next.setPrefetch(prefetch);
             next.setProducerCredit(producerCredit);
             transport.setProtocolConverter(next);

http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
index 4c036a9..4d7af7e 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
@@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.MutexTransport;
@@ -33,7 +32,7 @@ import org.apache.activemq.wireformat.WireFormat;
  */
 public class AMQPSslTransportFactory extends SslTransportFactory implements BrokerServiceAware
{
 
-    private BrokerContext brokerContext = null;
+    private BrokerService brokerService = null;
 
     @Override
     protected String getDefaultWireFormatType() {
@@ -43,7 +42,7 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements
Brok
     @Override
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
-        transport = new AmqpTransportFilter(transport, format, brokerContext);
+        transport = new AmqpTransportFilter(transport, format, brokerService);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
@@ -63,7 +62,7 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements
Brok
 
     @Override
     public void setBrokerService(BrokerService brokerService) {
-        this.brokerContext = brokerService.getBrokerContext();
+        this.brokerService = brokerService;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
index c67d3b6..b017937 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
 
-import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.MutexTransport;
@@ -43,7 +42,7 @@ import org.apache.activemq.wireformat.WireFormat;
  */
 public class AmqpNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware
{
 
-    private BrokerContext brokerContext = null;
+    private BrokerService brokerService = null;
 
     @Override
     protected String getDefaultWireFormatType() {
@@ -81,14 +80,14 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements
Brok
     @Override
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
-        transport = new AmqpTransportFilter(transport, format, brokerContext);
+        transport = new AmqpTransportFilter(transport, format, brokerService);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
 
     @Override
     public void setBrokerService(BrokerService brokerService) {
-        this.brokerContext = brokerService.getBrokerContext();
+        this.brokerService = brokerService;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 62966bc..7b3b825 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -19,10 +19,12 @@ package org.apache.activemq.transport.amqp;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -30,6 +32,7 @@ import javax.jms.Destination;
 import javax.jms.InvalidClientIDException;
 import javax.jms.InvalidSelectorException;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTempQueue;
@@ -55,8 +58,10 @@ import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.store.PersistenceAdapterSupport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -114,7 +119,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
     private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
     private static final int CHANNEL_MAX = 32767;
-    private final AmqpTransport amqpTransport;
     private static final Symbol COPY = Symbol.getSymbol("copy");
     private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
     private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
@@ -122,6 +126,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Symbol JMS_MAPPING_VERSION = Symbol.valueOf("x-opt-jms-mapping-version");
     private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
 
+    private final AmqpTransport amqpTransport;
+    private final BrokerService brokerService;
+
     protected int prefetch;
     protected int producerCredit;
     protected Transport protonTransport = Proton.transport();
@@ -129,8 +136,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     protected Collector eventCollector = new CollectorImpl();
     protected boolean useByteDestinationTypeAnnotation;
 
-    public AmqpProtocolConverter(AmqpTransport transport) {
+    public AmqpProtocolConverter(AmqpTransport transport, BrokerService brokerService) {
         this.amqpTransport = transport;
+        this.brokerService = brokerService;
 
         // the configured maxFrameSize on the URI.
         int maxFrameSize = transport.getWireFormat().getMaxAmqpFrameSize();
@@ -1468,4 +1476,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     public void setProducerCredit(int producerCredit) {
         this.producerCredit = producerCredit;
     }
+
+    @SuppressWarnings("unused")
+    private List<SubscriptionInfo> lookupSubscriptions() throws AmqpProtocolException
{
+        List<SubscriptionInfo> subscriptions = Collections.emptyList();
+        try {
+            subscriptions = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(),
connectionInfo.getClientId());
+        } catch (IOException e) {
+            throw new AmqpProtocolException("Error loading store subscriptions", true, e);
+        }
+
+        return subscriptions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
index e3680c5..9a01f7b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import org.fusesource.hawtbuf.Buffer;
 
 /**
+ *
  */
 public class AmqpSupport {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
index e394c85..3ca8ea1 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
@@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.MutexTransport;
@@ -33,7 +32,7 @@ import org.apache.activemq.wireformat.WireFormat;
  */
 public class AmqpTransportFactory extends TcpTransportFactory implements BrokerServiceAware
{
 
-    private BrokerContext brokerContext = null;
+    private BrokerService brokerService = null;
 
     @Override
     protected String getDefaultWireFormatType() {
@@ -43,14 +42,14 @@ public class AmqpTransportFactory extends TcpTransportFactory implements
BrokerS
     @Override
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
-        transport = new AmqpTransportFilter(transport, format, brokerContext);
+        transport = new AmqpTransportFilter(transport, format, brokerService);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
 
     @Override
     public void setBrokerService(BrokerService brokerService) {
-        this.brokerContext = brokerService.getBrokerContext();
+        this.brokerService = brokerService;
     }
 
     @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/activemq/blob/adafdfe9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index ec63ae7..5fb7a04 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -20,7 +20,7 @@ import java.io.IOException;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
@@ -49,9 +49,9 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
     private final ReentrantLock lock = new ReentrantLock();
 
-    public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext)
{
+    public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService)
{
         super(next);
-        this.protocolConverter = new AMQPProtocolDiscriminator(this);
+        this.protocolConverter = new AMQPProtocolDiscriminator(this, brokerService);
         if (wireFormat instanceof AmqpWireFormat) {
             this.wireFormat = (AmqpWireFormat) wireFormat;
         }


Mime
View raw message