activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1423655 - in /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker: PublishedAddressPolicy.java TransportConnector.java
Date Tue, 18 Dec 2012 21:15:07 GMT
Author: tabish
Date: Tue Dec 18 21:15:07 2012
New Revision: 1423655

URL: http://svn.apache.org/viewvc?rev=1423655&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3757
fix for: https://issues.apache.org/jira/browse/AMQ-3707
fix for: https://issues.apache.org/jira/browse/AMQ-4024

Add a strategy class for use in constructing the Published connect string for a transport
connector.  Allows for setting whether the connector will send IP addres, hostname or FQDN
along with configurable transport query options which allows control of client side transport
and wireformat settings. User can override with their own version to further customize. 

Added:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/PublishedAddressPolicy.java
  (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/PublishedAddressPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/PublishedAddressPolicy.java?rev=1423655&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/PublishedAddressPolicy.java
(added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/PublishedAddressPolicy.java
Tue Dec 18 21:15:07 2012
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.Locale;
+
+import org.apache.activemq.util.InetAddressUtil;
+
+/**
+ * Policy object that controls how a TransportConnector publishes the connector's
+ * address to the outside world.  By default the connector will publish itself
+ * using the resolved host name of the bound server socket.
+ *
+ * @org.apache.xbean.XBean
+ */
+public class PublishedAddressPolicy {
+
+    private String clusterClientUriQuery;
+    private PublishedHostStrategy publishedHostStrategy = PublishedHostStrategy.DEFAULT;
+
+    /**
+     * Defines the value of the published host value.
+     */
+    public enum PublishedHostStrategy {
+        DEFAULT,
+        IPADDRESS,
+        HOSTNAME,
+        FQDN;
+
+        public static PublishedHostStrategy getValue(String value) {
+            return valueOf(value.toUpperCase(Locale.ENGLISH));
+        }
+    }
+
+    /**
+     * Using the supplied TransportConnector this method returns the String that will
+     * be used to update clients with this connector's connect address.
+     *
+     * @param connector
+     *      The TransportConnector whose address is to be published.
+     * @return a string URI address that a client can use to connect to this Transport.
+     * @throws Exception
+     */
+    public String getPublishableConnectString(TransportConnector connector) throws Exception
{
+
+        URI connectorURI = connector.getConnectUri();
+
+        if (connectorURI == null) {
+            return null;
+        }
+
+        String scheme = connectorURI.getScheme();
+        String userInfo = getPublishedUserInfoValue(connectorURI.getUserInfo());
+        String host = getPublishedHostValue(connectorURI.getHost());
+        int port = connectorURI.getPort();
+        String path = getPublishedPathValue(connectorURI.getPath());
+        String fragment = getPublishedFragmentValue(connectorURI.getFragment());
+
+        URI publishedURI = new URI(scheme, userInfo, host, port, path, getClusterClientUriQuery(),
fragment);
+
+        return publishedURI.toString();
+    }
+
+    /**
+     * Subclasses can override what host value is published by implementing alternate
+     * logic for this method.
+     *
+     * @param uriHostEntry
+     * @return
+     * @throws UnknownHostException
+     */
+    protected String getPublishedHostValue(String uriHostEntry) throws UnknownHostException
{
+
+        // By default we just republish what was already present.
+        String result = uriHostEntry;
+
+        if (this.publishedHostStrategy.equals(PublishedHostStrategy.IPADDRESS)) {
+            InetAddress address = InetAddress.getByName(uriHostEntry);
+            result = address.getHostAddress();
+        } else if (this.publishedHostStrategy.equals(PublishedHostStrategy.HOSTNAME)) {
+            InetAddress address = InetAddress.getByName(uriHostEntry);
+            if (address.isAnyLocalAddress()) {
+                // make it more human readable and useful, an alternative to 0.0.0.0
+                result = InetAddressUtil.getLocalHostName();
+            } else {
+                result = address.getHostName();
+            }
+        } else if (this.publishedHostStrategy.equals(PublishedHostStrategy.FQDN)) {
+            InetAddress address = InetAddress.getByName(uriHostEntry);
+            if (address.isAnyLocalAddress()) {
+                // make it more human readable and useful, an alternative to 0.0.0.0
+                result = InetAddressUtil.getLocalHostName();
+            } else {
+                result = address.getCanonicalHostName();
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Subclasses can override what path value is published by implementing alternate
+     * logic for this method.  By default this method simply returns what was already
+     * set as the Path value in the original URI.
+     *
+     * @param uriPathEntry
+     *      The original value of the URI path.
+     *
+     * @return the desired value for the published URI's path.
+     */
+    protected String getPublishedPathValue(String uriPathEntry) {
+        return uriPathEntry;
+    }
+
+    /**
+     * Subclasses can override what host value is published by implementing alternate
+     * logic for this method.  By default this method simply returns what was already
+     * set as the Fragment value in the original URI.
+     *
+     * @param uriFragmentEntry
+     *      The original value of the URI Fragment.
+     *
+     * @return the desired value for the published URI's Fragment.
+     */
+    protected String getPublishedFragmentValue(String uriFragmentEntry) {
+        return uriFragmentEntry;
+    }
+
+    /**
+     * Subclasses can override what user info value is published by implementing alternate
+     * logic for this method.  By default this method simply returns what was already
+     * set as the UserInfo value in the original URI.
+     *
+     * @param uriUserInfoEntry
+     *      The original value of the URI user info.
+     *
+     * @return the desired value for the published URI's user info.
+     */
+    protected String getPublishedUserInfoValue(String uriUserInfoEntry) {
+        return uriUserInfoEntry;
+    }
+
+    /**
+     * Gets the URI query that's configured on the published URI that's sent to client's
+     * when the cluster info is updated.
+     *
+     * @return the clusterClientUriQuery
+     */
+    public String getClusterClientUriQuery() {
+        return clusterClientUriQuery;
+    }
+
+    /**
+     * Sets the URI query that's configured on the published URI that's sent to client's
+     * when the cluster info is updated.
+     *
+     * @param clusterClientUriQuery the clusterClientUriQuery to set
+     */
+    public void setClusterClientUriQuery(String clusterClientUriQuery) {
+        this.clusterClientUriQuery = clusterClientUriQuery;
+    }
+
+    /**
+     * @return the publishedHostStrategy
+     */
+    public PublishedHostStrategy getPublishedHostStrategy() {
+        return publishedHostStrategy;
+    }
+
+    /**
+     * @param publishedHostStrategy the publishedHostStrategy to set
+     */
+    public void setPublishedHostStrategy(PublishedHostStrategy strategy) {
+        this.publishedHostStrategy = strategy;
+    }
+
+    /**
+     * @param publishedHostStrategy the publishedHostStrategy to set
+     */
+    public void setPublishedHostStrategy(String strategy) {
+        this.publishedHostStrategy = PublishedHostStrategy.getValue(strategy);
+    }
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/PublishedAddressPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1423655&r1=1423654&r2=1423655&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
Tue Dec 18 21:15:07 2012
@@ -33,7 +33,10 @@ import org.apache.activemq.command.Broke
 import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
 import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.transport.*;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactorySupport;
+import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.discovery.DiscoveryAgent;
 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
 import org.apache.activemq.util.ServiceStopper;
@@ -70,6 +73,7 @@ public class TransportConnector implemen
     private boolean auditNetworkProducers = false;
     private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
     private int maximumConsumersAllowedPerConnection  = Integer.MAX_VALUE;
+    private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy();
 
     LinkedList<String> peerBrokers = new LinkedList<String>();
 
@@ -117,9 +121,11 @@ public class TransportConnector implemen
         rc.setAuditNetworkProducers(isAuditNetworkProducers());
         rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
         rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
+        rc.setPublishedAddressPolicy(getPublishedAddressPolicy());
         return rc;
     }
 
+    @Override
     public BrokerInfo getBrokerInfo() {
         return brokerInfo;
     }
@@ -172,6 +178,7 @@ public class TransportConnector implemen
     /**
      * @return the statistics for this connector
      */
+    @Override
     public ConnectorStatistics getStatistics() {
         return statistics;
     }
@@ -188,6 +195,7 @@ public class TransportConnector implemen
         this.messageAuthorizationPolicy = messageAuthorizationPolicy;
     }
 
+    @Override
     public void start() throws Exception {
         broker = brokerService.getBroker();
         brokerInfo.setBrokerName(broker.getBrokerName());
@@ -196,9 +204,11 @@ public class TransportConnector implemen
         brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
         brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
         getServer().setAcceptListener(new TransportAcceptListener() {
+            @Override
             public void onAccept(final Transport transport) {
                 try {
                     brokerService.getTaskRunnerFactory().execute(new Runnable() {
+                        @Override
                         public void run() {
                             try {
                                 Connection connection = createConnection(transport);
@@ -217,6 +227,7 @@ public class TransportConnector implemen
                 }
             }
 
+            @Override
             public void onAcceptError(Exception error) {
                 onAcceptError(error, null);
             }
@@ -244,25 +255,14 @@ public class TransportConnector implemen
     }
 
     public String getPublishableConnectString() throws Exception {
-        return getPublishableConnectString(getConnectUri());
-    }
-
-    public String getPublishableConnectString(URI theConnectURI) throws Exception {
-        String publishableConnectString = null;
-        if (theConnectURI != null) {
-            publishableConnectString = theConnectURI.toString();
-            // strip off server side query parameters which may not be compatible to clients
-            if (theConnectURI.getRawQuery() != null) {
-                publishableConnectString = publishableConnectString.substring(0, publishableConnectString
-                        .indexOf(theConnectURI.getRawQuery()) - 1);
-            }
-        }
+        String publishableConnectString = publishedAddressPolicy.getPublishableConnectString(this);
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Publishing: " + publishableConnectString + " for broker transport
URI: " + theConnectURI);
+            LOG.debug("Publishing: " + publishableConnectString + " for broker transport
URI: " + getConnectUri());
         }
         return publishableConnectString;
     }
 
+    @Override
     public void stop() throws Exception {
         ServiceStopper ss = new ServiceStopper();
         if (discoveryAgent != null) {
@@ -425,6 +425,7 @@ public class TransportConnector implemen
         }
     }
 
+    @Override
     public void updateClientClusterInfo() {
         if (isRebalanceClusterClients() || isUpdateClusterClients()) {
             ConnectionControl control = getConnectionControl();
@@ -488,6 +489,7 @@ public class TransportConnector implemen
     /**
      * This is called by the BrokerService right before it starts the transport.
      */
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
     }
@@ -503,6 +505,7 @@ public class TransportConnector implemen
     /**
      * @return the updateClusterClients
      */
+    @Override
     public boolean isUpdateClusterClients() {
         return this.updateClusterClients;
     }
@@ -518,6 +521,7 @@ public class TransportConnector implemen
     /**
      * @return the rebalanceClusterClients
      */
+    @Override
     public boolean isRebalanceClusterClients() {
         return this.rebalanceClusterClients;
     }
@@ -533,6 +537,7 @@ public class TransportConnector implemen
     /**
      * @return the updateClusterClientsOnRemove
      */
+    @Override
     public boolean isUpdateClusterClientsOnRemove() {
         return this.updateClusterClientsOnRemove;
     }
@@ -559,6 +564,7 @@ public class TransportConnector implemen
         this.updateClusterFilter = updateClusterFilter;
     }
 
+    @Override
     public int connectionCount() {
         return connections.size();
     }
@@ -591,4 +597,24 @@ public class TransportConnector implemen
     public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection)
{
         this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
     }
+
+    /**
+     * Gets the currently configured policy for creating the published connection address
of this
+     * TransportConnector.
+     *
+     * @return the publishedAddressPolicy
+     */
+    public PublishedAddressPolicy getPublishedAddressPolicy() {
+        return publishedAddressPolicy;
+    }
+
+    /**
+     * Sets the configured policy for creating the published connection address of this
+     * TransportConnector.
+     *
+     * @return the publishedAddressPolicy
+     */
+    public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy)
{
+        this.publishedAddressPolicy = publishedAddressPolicy;
+    }
 }



Mime
View raw message