activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r645580 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
Date Mon, 07 Apr 2008 15:59:36 GMT
Author: chirino
Date: Mon Apr  7 08:59:34 2008
New Revision: 645580

URL: http://svn.apache.org/viewvc?rev=645580&view=rev
Log:
Applied patch https://issues.apache.org/activemq/browse/AMQ-1587

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java?rev=645580&r1=645579&r2=645580&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
Mon Apr  7 08:59:34 2008
@@ -17,11 +17,12 @@
 package org.apache.activemq.network;
 
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Hashtable;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
 
+import javax.naming.CommunicationException;
 import javax.naming.Context;
 import javax.naming.NamingEnumeration;
 import javax.naming.directory.Attributes;
@@ -29,8 +30,14 @@
 import javax.naming.directory.InitialDirContext;
 import javax.naming.directory.SearchControls;
 import javax.naming.directory.SearchResult;
+import javax.naming.event.EventDirContext;
+import javax.naming.event.NamespaceChangeListener;
+import javax.naming.event.NamingEvent;
+import javax.naming.event.NamingExceptionEvent;
+import javax.naming.event.ObjectChangeListener;
 
-import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.URISupport.CompositeData;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -46,178 +53,405 @@
  *
  * @org.apache.xbean.XBean element="ldapNetworkConnector"
  */
-public class LdapNetworkConnector extends NetworkConnector {
-    private static final Log LOG = LogFactory.getLog(LdapNetworkConnector.class);
-
-    // TODO: future >> LDAP JNDI event handling to update connectors?
-
-    // force returned entries to implement the ipHost and ipService objectClasses (RFC 2307)
-    private static final String REQUIRED_OBJECT_CLASS_FILTER  = "(&(objectClass=ipHost)(objectClass=ipService))";
-
-    // required
-    private URI    ldapURI;
-    private String ldapBase;
-    private String ldapUser;
-    private String ldapPassword;
-
-    // optional
-    private int    ldapSearchScope  =  SearchControls.OBJECT_SCOPE;
-    private String ldapSearchFilter =  REQUIRED_OBJECT_CLASS_FILTER;
-
-    // internal configurables
-    private DirContext ldapContext;
-    private List<NetworkConnector> connectors = new CopyOnWriteArrayList<NetworkConnector>();
-
-    /**
-     * default constructor
-     */
-    public LdapNetworkConnector() {
-    }
-
-    /**
-     * sets the LDAP server URI
-     *
-     * @param uri LDAP server URI
-     */
-    public void setUri(URI uri) {
-        ldapURI = uri;
-    }
-
-    /**
-     * sets the base LDAP dn used for lookup operations
-     *
-     * @param base LDAP base dn
-     */
-    public void setBase(String base) {
-        ldapBase = base;
-    }
-
-    /**
-     * sets the LDAP user for access credentials
-     *
-     * @param user LDAP dn of user
-     */
-    public void setUser(String user) {
-        ldapUser = user;
-    }
-
-    /**
-     * sets the LDAP password for access credentials
-     *
-     * @param password user password
-     */
-    public void setPassword(String password) {
-        ldapPassword = password;
-    }
-
-    /**
-     * sets the LDAP search scope
-     *
-     * @param searchScope LDAP JNDI search scope
-     */
-    public void setSearchScope(String searchScope) throws Exception {
-        if(searchScope.equals("OBJECT_SCOPE")) {
-            ldapSearchScope = SearchControls.OBJECT_SCOPE;
-        }
-        else if(searchScope.equals("ONELEVEL_SCOPE")) {
-            ldapSearchScope = SearchControls.ONELEVEL_SCOPE;
-        }
-        else if(searchScope.equals("SUBTREE_SCOPE")) {
-            ldapSearchScope = SearchControls.SUBTREE_SCOPE;
-        }
-        else {
-          throw new Exception("ERR: unknown LDAP search scope specified: " + searchScope);
-        }
-    }
-
-    /**
-     * sets the LDAP search filter as defined in RFC 2254
-     *
-     * @param searchFilter LDAP search filter
-     * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
-     */
-    public void setSearchFilter(String searchFilter) {
-        ldapSearchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + searchFilter +
"))";
-    }
-
-    /**
-     * start the connector
-     */
-    // XXX: this method seems awfully redundant when looking through the
-    //      call stack when used in NetworkConnector based objects. I don't
-    //      see why derived classes shouldn't just override the start/stop methods
-    protected void handleStart() throws Exception {
-        initLdapContext();
-        for(URI uri : getLdapUris()) {
-            NetworkConnector connector = getBrokerService().addNetworkConnector(uri);
-            connector.start();
-            connectors.add(connector);
-        }
-        super.handleStart();
-    }
-
-    /**
-     * stop the connector
-     *
-     * @param stopper service stopper object
-     */
-    // XXX: this method seems awfully redundant when looking through the
-    //      call stack when used in NetworkConnector based objects. I don't
-    //      see why derived classes shouldn't just override the start/stop methods
-    protected void handleStop(ServiceStopper stopper) throws Exception {
-        for(NetworkConnector connector : connectors) {
-            getBrokerService().removeNetworkConnector(connector);
-            connector.stop();
-        }
-        ldapContext.close();
-        super.handleStop(stopper);
-    }
-
-    /**
-     * returns the name of the connector
-     *
-     * @return connector name
-     */
-    // XXX: this should probably be fixed elsewhere for all
-    //      NetworkConnector derivatives...this impl does not
-    //      seem to be well thought out?
-    public String getName() {
-        return toString();
-    }
-
-    /**
-     * initializes the LDAP JNDI context with the configured parameters
-     */
-    protected void initLdapContext() throws Exception {
-        Hashtable env = new Hashtable();
-        env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
-        env.put(Context.PROVIDER_URL,            ldapURI.toString());
-        env.put(Context.SECURITY_PRINCIPAL,      ldapUser);
-        env.put(Context.SECURITY_CREDENTIALS,    ldapPassword);
-        ldapContext = new InitialDirContext(env);
-    }
-
-    /**
-     * retrieves URIs matching the search filter via LDAP 
-     * and creates network connectors based on the entries
-     *
-     * @returns list of retrieved URIs
-     */
-    protected List<URI> getLdapUris() throws Exception {
-        SearchControls controls = new SearchControls();
-        controls.setSearchScope(ldapSearchScope);
-        NamingEnumeration<SearchResult> results = ldapContext.search(ldapBase, ldapSearchFilter,
controls);
-
-        List<URI> uriList = new ArrayList();
-        while(results.hasMore()) {
-            Attributes attributes = results.next().getAttributes();
-            String address  = (String)attributes.get("iphostnumber").get();
-            String port     = (String)attributes.get("ipserviceport").get();
-            String protocol = (String)attributes.get("ipserviceprotocol").get();
-            URI uri = new URI("static:(" + protocol + "://" + address + ":" + port + ")");
-            LOG.info("Discovered URI " + uri);
-            uriList.add(uri);
-        }
-        return uriList;
-    }
+public class      LdapNetworkConnector
+       extends    NetworkConnector
+       implements NamespaceChangeListener,
+                  ObjectChangeListener
+{
+   private static final Log LOG = LogFactory.getLog(LdapNetworkConnector.class);
+
+   // force returned entries to implement the ipHost and ipService object classes (RFC 2307)
+   private static final String REQUIRED_OBJECT_CLASS_FILTER  = "(&(objectClass=ipHost)(objectClass=ipService))";
+
+   // connection
+   private URI[]   availableURIs      = null;
+   private int     availableURIsIndex = 0;
+   private String  base               = null;
+   private boolean failover           = false;
+   private long    curReconnectDelay  = 1000;  /* 1 sec */
+   private long    maxReconnectDelay  = 30000; /* 30 sec */
+
+   // authentication
+   private String  user                    = null;
+   private String  password                = null;
+   private boolean anonymousAuthentication = false;
+
+   // search
+   private SearchControls searchControls      = new SearchControls(/* ONELEVEL_SCOPE */);
+   private String         searchFilter        = REQUIRED_OBJECT_CLASS_FILTER;
+   private boolean        searchEventListener = false;
+
+   // connector management
+   private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap();
+   private Map<URI, Integer>          referenceMap = new ConcurrentHashMap();
+   private Map<String, URI>           uuidMap      = new ConcurrentHashMap();
+
+   // local context
+   private DirContext context = null;
+
+   /**
+    * returns the next URI from the configured list
+    *
+    * @return random URI from the configured list
+    */
+   public URI getUri()
+      { return availableURIs[++availableURIsIndex % availableURIs.length]; }
+
+   /**
+    * sets the LDAP server URI
+    *
+    * @param _uri LDAP server URI
+    */
+   public void setUri(URI _uri)
+      throws Exception
+   {
+      CompositeData data = URISupport.parseComposite(_uri);
+      if(data.getScheme().equals("failover"))
+      {
+         availableURIs = data.getComponents();
+         failover = true;
+      }
+      else
+         { availableURIs = new URI[]{ _uri }; }
+   }
+
+   /**
+    * sets the base LDAP dn used for lookup operations
+    *
+    * @param _base LDAP base dn
+    */
+   public void setBase(String _base)
+      { base = _base; }
+
+   /**
+    * sets the LDAP user for access credentials
+    *
+    * @param _user LDAP dn of user
+    */
+   public void setUser(String _user)
+      { user = _user; }
+
+   /**
+    * sets the LDAP password for access credentials
+    *
+    * @param _password user password
+    */
+   public void setPassword(String _password)
+      { password = _password; }
+
+   /**
+    * sets LDAP anonymous authentication access credentials
+    *
+    * @param _anonymousAuthentication set to true to use anonymous authentication
+    */
+   public void setAnonymousAuthentication(boolean _anonymousAuthentication)
+      { anonymousAuthentication = _anonymousAuthentication; }
+
+   /**
+    * sets the LDAP search scope
+    *
+    * @param _searchScope LDAP JNDI search scope
+    */
+   public void setSearchScope(String _searchScope)
+      throws Exception
+   {
+      int scope;
+      if(_searchScope.equals("OBJECT_SCOPE"))
+         { scope = SearchControls.OBJECT_SCOPE; }
+      else if(_searchScope.equals("ONELEVEL_SCOPE"))
+         { scope = SearchControls.ONELEVEL_SCOPE; }
+      else if(_searchScope.equals("SUBTREE_SCOPE"))
+         { scope = SearchControls.SUBTREE_SCOPE; }
+      else
+         { throw new Exception("ERR: unknown LDAP search scope specified: " + _searchScope);
}
+      searchControls.setSearchScope(scope);
+   }
+
+   /**
+    * sets the LDAP search filter as defined in RFC 2254
+    *
+    * @param _searchFilter LDAP search filter
+    * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
+    */
+   public void setSearchFilter(String _searchFilter)
+      { searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + _searchFilter + "))";
}
+
+   /**
+    * enables/disable a persistent search to the LDAP server as defined
+    * in draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
+    *
+    * @param _searchEventListener enable = true, disable = false (default)
+    * @see <a href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
+    */
+   public void setSearchEventListener(boolean _searchEventListener)
+      { searchEventListener = _searchEventListener; }
+
+   /**
+    * start the connector
+    */
+   public void start()
+      throws Exception
+   {
+      LOG.info("connecting...");
+      Hashtable<String, String> env = new Hashtable();
+      env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
+      URI uri = getUri();
+      LOG.debug("    URI [" + uri + "]");
+      env.put(Context.PROVIDER_URL, uri.toString());
+      if(anonymousAuthentication)
+      {
+         LOG.debug("    login credentials [anonymous]");
+         env.put(Context.SECURITY_AUTHENTICATION, "none");
+      }
+      else
+      {
+         LOG.debug("    login credentials [" + user + ":******]");
+         env.put(Context.SECURITY_PRINCIPAL,   user);
+         env.put(Context.SECURITY_CREDENTIALS, password);
+      }
+      boolean isConnected = false;
+      while(!isConnected)
+      {
+         try
+         {
+            context = new InitialDirContext(env);
+            isConnected = true;
+         }
+         catch(CommunicationException err)
+         {
+            if(failover)
+            {
+               uri = getUri();
+               LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover
connection to [" + uri.toString() + "]");
+               env.put(Context.PROVIDER_URL, uri.toString());
+               Thread.sleep(curReconnectDelay);
+               curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay);
+            }
+            else
+               { throw err; }
+         }
+      }
+
+      // add connectors from search results
+      LOG.info("searching for network connectors...");
+      LOG.debug("    base   [" + base + "]");
+      LOG.debug("    filter [" + searchFilter + "]");
+      LOG.debug("    scope  [" + searchControls.getSearchScope() + "]");
+      NamingEnumeration<SearchResult> results = context.search(base, searchFilter,
searchControls);
+      while(results.hasMore())
+         { addConnector(results.next()); }
+
+      // register persistent search event listener
+      if(searchEventListener)
+      {
+         LOG.info("registering persistent search listener...");
+         EventDirContext eventContext = (EventDirContext)context.lookup("");
+         eventContext.addNamingListener(base, searchFilter, searchControls, this);
+      }
+      else // otherwise close context (i.e. connection as it is no longer needed)
+         { context.close(); }
+   }
+
+   /**
+    * stop the connector
+    */
+   public void stop()
+      throws Exception
+   {
+      LOG.info("stopping context...");
+      for(NetworkConnector connector : connectorMap.values())
+         { connector.stop(); }
+      connectorMap.clear();
+      referenceMap.clear();
+      uuidMap.clear();
+      context.close();
+   }
+
+   /**
+    * returns the name of the connector
+    *
+    * @return connector name
+    */
+   public String getName()
+      { return toString(); }
+
+   /**
+    * add connector of the given URI
+    *
+    * @param result search result of connector to add
+    */
+   protected synchronized void addConnector(SearchResult result)
+      throws Exception
+   {
+      String uuid = toUUID(result);
+      if(uuidMap.containsKey(uuid))
+      {
+         LOG.warn("connector already regsitered for UUID [" + uuid + "]");
+         return;
+      }
+
+      URI connectorURI = toURI(result);
+      if(connectorMap.containsKey(connectorURI))
+      {
+         int referenceCount = referenceMap.get(connectorURI) + 1;
+         LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid
+ "], total reference(s) [" + referenceCount + "]");
+         referenceMap.put(connectorURI, referenceCount);
+         uuidMap.put(uuid, connectorURI);
+         return;
+      }
+
+      // FIXME: disable JMX listing of LDAP managed connectors, we will
+      //       want to map/manage these differently in the future
+//      boolean useJMX = getBrokerService().isUseJmx();
+//      getBrokerService().setUseJmx(false);
+      NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI);
+//      getBrokerService().setUseJmx(useJMX);
+
+      // propogate std connector properties that may have been set via XML
+      connector.setDynamicOnly(isDynamicOnly());
+      connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
+      connector.setNetworkTTL(getNetworkTTL());
+      connector.setConduitSubscriptions(isConduitSubscriptions());
+      connector.setExcludedDestinations(getExcludedDestinations());
+      connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
+      connector.setDuplex(isDuplex());
+
+      // XXX: set in the BrokerService.startAllConnectors method and is 
+      //      required to prevent remote broker exceptions upon connection
+      connector.setLocalUri(getBrokerService().getVmConnectorURI());
+      connector.setBrokerName(getBrokerService().getBrokerName());
+      connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations());
+
+      // start network connector
+      connectorMap.put(connectorURI, connector);
+      referenceMap.put(connectorURI, 1);
+      uuidMap.put(uuid, connectorURI);
+      connector.start();
+      LOG.info("connector added with URI [" + connectorURI + "]");
+   }
+
+   /**
+    * remove connector of the given URI
+    *
+    * @param result search result of connector to remove
+    */
+   protected synchronized void removeConnector(SearchResult result)
+      throws Exception
+   {
+      String uuid = toUUID(result);
+      if(!uuidMap.containsKey(uuid))
+      {
+         LOG.warn("connector not regsitered for UUID [" + uuid + "]");
+         return;
+      }
+
+      URI connectorURI = uuidMap.get(uuid);
+      if(!connectorMap.containsKey(connectorURI))
+      {
+         LOG.warn("connector not regisitered for URI [" + connectorURI + "]");
+         return;
+      }
+
+      int referenceCount = referenceMap.get(connectorURI) - 1;
+      referenceMap.put(connectorURI, referenceCount);
+      uuidMap.remove(uuid);
+      LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid
+ "], remaining reference(s) [" + referenceCount + "]");
+
+      if(referenceCount > 0)
+         { return; }
+
+      NetworkConnector connector = connectorMap.remove(connectorURI);
+      connector.stop();
+      LOG.info("connector removed with URI [" + connectorURI + "]");
+   }
+
+   /**
+    * convert search result into URI
+    *
+    * @param result search result to convert to URI
+    */
+   protected URI toURI(SearchResult result)
+      throws Exception
+   {
+      Attributes attributes = result.getAttributes();
+      String address  = (String)attributes.get("iphostnumber").get();
+      String port     = (String)attributes.get("ipserviceport").get();
+      String protocol = (String)attributes.get("ipserviceprotocol").get();
+      URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")");
+      LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]");
+      return connectorURI;
+   }
+
+   /**
+    * convert search result into URI
+    *
+    * @param result search result to convert to URI
+    */
+   protected String toUUID(SearchResult result)
+   {
+      String uuid = result.getNameInNamespace();
+      LOG.debug("retrieved UUID from SearchResult [" + uuid + "]");
+      return uuid;
+   }
+
+   /**
+    * invoked when an entry has been added during a persistent search
+    */
+   public void objectAdded(NamingEvent event)
+   {
+      LOG.debug("entry added");
+      try
+         { addConnector((SearchResult)event.getNewBinding()); }
+      catch(Exception err)
+         { LOG.error("ERR: caught unexpected exception", err); }
+   }
+
+   /**
+    * invoked when an entry has been removed during a persistent search
+    */
+   public void objectRemoved(NamingEvent event)
+   {
+      LOG.debug("entry removed");
+      try
+         { removeConnector((SearchResult)event.getOldBinding()); }
+      catch(Exception err)
+         { LOG.error("ERR: caught unexpected exception", err); }
+   }
+
+   /**
+    * invoked when an entry has been renamed during a persistent search
+    */
+   public void objectRenamed(NamingEvent event)
+   {
+      LOG.debug("entry renamed");
+      // XXX: getNameInNamespace method does not seem to work properly,
+      //      but getName seems to provide the result we want
+      String uuidOld = event.getOldBinding().getName();
+      String uuidNew = event.getNewBinding().getName();
+      URI connectorURI = uuidMap.remove(uuidOld);
+      uuidMap.put(uuidNew, connectorURI);
+      LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID ["
+ uuidOld + "], New UUID [" + uuidNew + "]");
+   }
+
+   /**
+    * invoked when an entry has been changed during a persistent search
+    */
+   public void objectChanged(NamingEvent event)
+   {
+      LOG.debug("entry changed");
+      try
+      {
+         SearchResult result = (SearchResult)event.getNewBinding();
+         removeConnector(result);
+         addConnector(result);
+      }
+      catch(Exception err)
+         { LOG.error("ERR: caught unexpected exception", err); }
+   }
+
+   /**
+    * invoked when an exception has occurred during a persistent search
+    */
+   public void namingExceptionThrown(NamingExceptionEvent event)
+      { LOG.error("ERR: caught unexpected exception", event.getException()); }
 }



Mime
View raw message