directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From elecha...@apache.org
Subject svn commit: r1417420 - in /directory/apacheds/trunk: protocol-ldap/src/main/java/org/apache/directory/server/ldap/ protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ server-integ/src/test/java/org/apache/directory/server...
Date Wed, 05 Dec 2012 13:49:35 GMT
Author: elecharny
Date: Wed Dec  5 13:49:34 2012
New Revision: 1417420

URL: http://svn.apache.org/viewvc?rev=1417420&view=rev
Log:
o Added a Pinger thread that check regularly the connection we are using for replication :
it can detect brutal disconnections
o Added a ping() method in the ReplicationConsumer interface
o Added some checked to avoid doing useless operations if we are already disconnected

Added:
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/PingerThread.java
  (with props)
Modified:
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumer.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java
    directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientServerReplicationIT.java
    directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java?rev=1417420&r1=1417419&r2=1417420&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java
Wed Dec  5 13:49:34 2012
@@ -32,7 +32,9 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.net.ssl.KeyManagerFactory;
 
@@ -66,6 +68,7 @@ import org.apache.directory.server.ldap.
 import org.apache.directory.server.ldap.handlers.response.SearchResultEntryHandler;
 import org.apache.directory.server.ldap.handlers.response.SearchResultReferenceHandler;
 import org.apache.directory.server.ldap.handlers.ssl.LdapsInitializer;
+import org.apache.directory.server.ldap.replication.consumer.PingerThread;
 import org.apache.directory.server.ldap.replication.consumer.ReplicationConsumer;
 import org.apache.directory.server.ldap.replication.consumer.ReplicationStatusEnum;
 import org.apache.directory.server.ldap.replication.provider.ReplicationRequestHandler;
@@ -684,8 +687,11 @@ public class LdapServer extends Director
      */
     public void startReplicationConsumers() throws Exception
     {
-        if ( replConsumers != null )
+        if ( ( replConsumers != null ) && ( replConsumers.size() > 0 ) )
         {
+            final PingerThread pingerThread = new PingerThread();
+            pingerThread.start();
+            
             for ( final ReplicationConsumer consumer : replConsumers )
             {
                 consumer.init( getDirectoryService() );
@@ -704,6 +710,8 @@ public class LdapServer extends Director
                                 
                                 if ( isConnected )
                                 {
+                                    pingerThread.addConsumer( consumer );
+
                                     // We are now connected, start the replication
                                     ReplicationStatusEnum status = null;
                                     

Added: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/PingerThread.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/PingerThread.java?rev=1417420&view=auto
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/PingerThread.java
(added)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/PingerThread.java
Wed Dec  5 13:49:34 2012
@@ -0,0 +1,110 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.ldap.replication.consumer;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A thread used to ping the provider o check if they are alive or not.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ */
+public class PingerThread extends Thread
+{
+    /** Logger for the replication consumer */
+    private static final Logger CONSUMER_LOG = LoggerFactory.getLogger( "CONSUMER_LOG" );
+
+    /** The list of consumers we want to check */
+    private Queue<ReplicationConsumer> consumers = new ConcurrentLinkedQueue<ReplicationConsumer>();
+    
+    /** A flag to stop the pinger */
+    private boolean stop = false;
+
+    /**
+     * Create a new instance of this thread.
+     */
+    public PingerThread()
+    {
+        setDaemon( true );
+    }
+    
+    
+    /**
+     * Starts the thread
+     */
+    public void run()
+    {
+        try
+        {
+            CONSUMER_LOG.debug( "Starting the provider's pinger" );
+
+            while ( !stop )
+            {
+                for ( ReplicationConsumer consumer : consumers )
+                {
+                    consumer.ping();
+                }
+
+                Thread.sleep( 5000 );
+            }
+        }
+        catch ( InterruptedException ie )
+        {
+            CONSUMER_LOG.debug( "The pinger has been interrupted" );
+        }
+    }
+    
+    
+    /**
+     * Add a new consumer to ping
+     * 
+     * @param consumer The consumer we want to ping
+     */
+    public void addConsumer( ReplicationConsumer consumer )
+    {
+        if ( !consumers.contains( consumer ) )
+        {
+            consumers.add( consumer );
+        }
+    }
+    
+    
+    /**
+     * Remove a consumer to ping
+     * @param consumer The consumer we want to remove
+     */
+    public void removeConsumer( ReplicationConsumer consumer )
+    {
+        consumers.remove( consumer );
+    }
+    
+    
+    /**
+     * Stops the ping for all the consumers
+     */
+    public void stopPinging()
+    {
+        stop = true;
+    }
+}
\ No newline at end of file

Propchange: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/PingerThread.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumer.java?rev=1417420&r1=1417419&r2=1417420&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumer.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumer.java
Wed Dec  5 13:49:34 2012
@@ -70,6 +70,13 @@ public interface ReplicationConsumer
      * @return true if the consumer is connected, false otherwise
      */
     boolean connect( boolean now );
+    
+    
+    /**
+     * Test the connection with the provider. It does connect to the provider, and
+     * tries to bind on it using the consumer credentials.
+     */
+    void ping();
 
 
     /**

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java?rev=1417420&r1=1417419&r2=1417420&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java
Wed Dec  5 13:49:34 2012
@@ -19,7 +19,6 @@
  */
 package org.apache.directory.server.ldap.replication.consumer;
 
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -199,8 +198,8 @@ public class ReplicationConsumerImpl imp
         
         prepareSyncSearchRequest();
     }
-
-
+    
+    
     /**
      * Connect to the remote server. Note that a SyncRepl consumer will be connected to only
      * one remote server
@@ -247,11 +246,13 @@ public class ReplicationConsumerImpl imp
                     CONSUMER_LOG.warn( "Failed to bind to the producer {} with the given
bind Dn {}", config.getProducer(), config.getReplUserDn() );
                     LOG.warn( "Failed to bind to the server with the given bind Dn {}", config.getReplUserDn()
);
                     LOG.warn( "", le );
+                    disconnected = true;
                 }
             }
             else
             {
                 CONSUMER_LOG.warn( "Consumer {} cannot connect to producer {}", config.getReplicaId(),
config.getProducer() );
+                disconnected = true;
 
                 return false;
             }
@@ -260,6 +261,7 @@ public class ReplicationConsumerImpl imp
         {
             CONSUMER_LOG.error( "Failed to connect to the server {}, cause : {}", config.getProducer(),
e.getMessage() );
             LOG.error( "Failed to connect to the server {}, cause : {}", config.getProducer(),
e.getMessage() );
+            disconnected = true;
         }
 
         return false;
@@ -667,9 +669,55 @@ public class ReplicationConsumerImpl imp
     /**
      * {@inheritDoc}
      */
+    public void ping()
+    {
+        boolean connected = !disconnected;
+        
+        if ( disconnected )
+        {
+            connected = connect();
+        }
+        
+        if ( connected )
+        {
+            CONSUMER_LOG.debug( "PING : The consumer {} is alive", config.getReplicaId()
);
+
+            try
+            {
+                Entry baseDn = connection.lookup( config.getBaseDn(), "1.1" );
+                
+                if ( baseDn == null )
+                {
+                    // Cannot get the entry : this is bad, but possible
+                    CONSUMER_LOG.debug( "Cannot fetch '{}' from provider for consumer {}",
config.getBaseDn(), config.getReplicaId() );
+                }
+                else
+                {
+                    CONSUMER_LOG.debug( "Fetched '{}' from provider for consumer {}", config.getBaseDn(),
config.getReplicaId() );
+                }
+            }
+            catch ( LdapException le )
+            {
+                // Error : we must disconnect
+                disconnect();
+            }
+        }
+        else
+        {
+            CONSUMER_LOG.debug( "PING : The consumer {} cannot be connected", config.getReplicaId()
);
+        }
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
     public void stop()
     {
-        disconnect();
+        if ( !disconnected )
+        {
+            disconnect();
+        }
     }
 
 
@@ -822,32 +870,40 @@ public class ReplicationConsumerImpl imp
     private void disconnect()
     {
         disconnected = true;
-
-        try
-        {
-            stopRefreshing();
-
-            connection.unBind();
-            LOG.info( "Unbound from the server {}", config.getProducer() );
-            CONSUMER_LOG.info( "Unbound from the server {}", config.getProducer() );
-
-            connection.close();
-            LOG.info( "Connection closed for the server {}", config.getProducer() );
-            CONSUMER_LOG.info( "Connection closed for the server {}", config.getProducer()
);
-
-            connection = null;
-        }
-        catch ( Exception e )
+        
+        if ( connection == null )
         {
-            LOG.error( "Failed to close the connection", e );
+            return;
         }
-        finally
+        
+        if ( connection.isConnected() )
         {
-            // persist the cookie
-            storeCookie();
-            
-            // reset the cookie
-            syncCookie = null;
+            try
+            {
+                stopRefreshing();
+    
+                connection.unBind();
+                LOG.info( "Unbound from the server {}", config.getProducer() );
+                CONSUMER_LOG.info( "Unbound from the server {}", config.getProducer() );
+    
+                connection.close();
+                LOG.info( "Connection closed for the server {}", config.getProducer() );
+                CONSUMER_LOG.info( "Connection closed for the server {}", config.getProducer()
);
+    
+                connection = null;
+            }
+            catch ( Exception e )
+            {
+                LOG.error( "Failed to close the connection", e );
+            }
+            finally
+            {
+                // persist the cookie
+                storeCookie();
+                
+                // reset the cookie
+                syncCookie = null;
+            }
         }
     }
 

Modified: directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientServerReplicationIT.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientServerReplicationIT.java?rev=1417420&r1=1417419&r2=1417420&view=diff
==============================================================================
--- directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientServerReplicationIT.java
(original)
+++ directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientServerReplicationIT.java
Wed Dec  5 13:49:34 2012
@@ -227,7 +227,7 @@ public class ClientServerReplicationIT
 
         for ( int i = 0; i < 100; i++ )
         {
-            Thread.sleep( 50 );
+            Thread.sleep( 100 );
 
             exists = session.exists( entryDn );
 
@@ -236,7 +236,7 @@ public class ClientServerReplicationIT
                 return true;
             }
 
-            Thread.sleep( 50 );
+            Thread.sleep( 100 );
         }
 
         dump( session, entryDn );
@@ -425,6 +425,7 @@ public class ClientServerReplicationIT
 
     private Entry restartConsumer( Entry provUser ) throws Exception
     {
+        System.out.println( "------------------------------------- Stop consumer" );
         // Now stop the consumer
         consumerServer.stop();
 
@@ -445,6 +446,7 @@ public class ClientServerReplicationIT
         Thread.sleep( 1000 );
 
         // Restart the consumer
+        System.out.println( "------------------------------------- Start consumer" );
         consumerServer.start();
 
         assertTrue( checkEntryDeletion( consumerSession, deletedUserDn ) );

Modified: directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java?rev=1417420&r1=1417419&r2=1417420&view=diff
==============================================================================
--- directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java
(original)
+++ directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java
Wed Dec  5 13:49:34 2012
@@ -97,6 +97,9 @@ public class MockSyncReplConsumer implem
 {
     /** the logger */
     private static final Logger LOG = LoggerFactory.getLogger( MockSyncReplConsumer.class
);
+    
+    /** A dedicated logger for the consumer */
+    private static final Logger CONSUMER_LOG = LoggerFactory.getLogger( "CONSUMER_LOG" );
 
     /** The codec */
     private LdapApiService ldapCodecService = LdapApiServiceFactory.getSingleton();
@@ -228,22 +231,40 @@ public class MockSyncReplConsumer implem
                 connection.addConnectionClosedEventListener( this );
             }
 
-            // Do a bind
-            try
+            // Try to connect
+            if ( connection.connect() )
             {
-                connection.bind( config.getReplUserDn(), Strings.utf8ToString( config.getReplUserPassword()
) );
-                disconnected = false;
-                return true;
+                CONSUMER_LOG.info( "Consumer {} connected to producer {}", config.getReplicaId(),
config.getProducer() );
+
+                // Do a bind
+                try
+                {
+                    connection.bind( config.getReplUserDn(), Strings.utf8ToString( config.getReplUserPassword()
) );
+                    disconnected = false;
+                    
+                    return true;
+                }
+                catch ( LdapException le )
+                {
+                    CONSUMER_LOG.warn( "Failed to bind to the producer {} with the given
bind Dn {}", config.getProducer(), config.getReplUserDn() );
+                    LOG.warn( "Failed to bind to the server with the given bind Dn {}", config.getReplUserDn()
);
+                    LOG.warn( "", le );
+                    disconnected = true;
+                }
             }
-            catch ( LdapException le )
+            else
             {
-                LOG.warn( "Failed to bind to the server with the given bind Dn {}", config.getReplUserDn()
);
-                LOG.warn( "", le );
+                CONSUMER_LOG.warn( "Consumer {} cannot connect to producer {}", config.getReplicaId(),
config.getProducer() );
+                disconnected = true;
+
+                return false;
             }
         }
         catch ( Exception e )
         {
-            LOG.error( "Failed to bind with the given bindDN and credentials", e );
+            CONSUMER_LOG.error( "Failed to connect to the server {}, cause : {}", config.getProducer(),
e.getMessage() );
+            LOG.error( "Failed to connect to the server {}, cause : {}", config.getProducer(),
e.getMessage() );
+            disconnected = true;
         }
 
         return false;
@@ -535,6 +556,49 @@ public class MockSyncReplConsumer implem
     /**
      * {@inheritDoc}
      */
+    public void ping()
+    {
+        boolean connected = !disconnected;
+        
+        if ( disconnected )
+        {
+            connected = connect();
+        }
+
+        if ( connected )
+        {
+            CONSUMER_LOG.debug( "PING : The consumer {} is alive", config.getReplicaId()
);
+
+            try
+            {
+                Entry baseDn = connection.lookup( config.getBaseDn(), "1.1" );
+                
+                if ( baseDn == null )
+                {
+                    // Cannot get the entry : this is bad, but possible
+                    CONSUMER_LOG.debug( "Cannot fetch '{}' from provider for consumer {}",
config.getBaseDn(), config.getReplicaId() );
+                }
+                else
+                {
+                    CONSUMER_LOG.debug( "Fetched '{}' from provider for consumer {}", config.getBaseDn(),
config.getReplicaId() );
+                }
+            }
+            catch ( LdapException le )
+            {
+                // Error : we must disconnect
+                disconnect();
+            }
+        }
+        else
+        {
+            CONSUMER_LOG.debug( "PING : The consumer {} cannot be connected", config.getReplicaId()
);
+        }
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
     public boolean connect( boolean now )
     {
         boolean connected = false;
@@ -572,8 +636,11 @@ public class MockSyncReplConsumer implem
      */
     public void stop()
     {
-        disconnect();
-        nbAdded.getAndSet( 0 );
+        if ( !disconnected )
+        {
+            disconnect();
+            nbAdded.getAndSet( 0 );
+        }
     }
 
 
@@ -674,33 +741,41 @@ public class MockSyncReplConsumer implem
     public void disconnect()
     {
         disconnected = true;
-
-        try
-        {
-            if ( refreshThread != null )
-            {
-                refreshThread.stopRefreshing();
-            }
-
-            connection.unBind();
-            LOG.info( "Unbound from the server {}", config.getRemoteHost() );
-
-            connection.close();
-            LOG.info( "Connection closed for the server {}", config.getRemoteHost() );
-
-            connection = null;
-        }
-        catch ( Exception e )
+        
+        if ( connection == null )
         {
-            LOG.error( "Failed to close the connection", e );
+            return;
         }
-        finally
+        
+        if ( connection.isConnected() )
         {
-            // persist the cookie
-            storeCookie();
-            
-            // reset the cookie
-            syncCookie = null;
+            try
+            {
+                if ( refreshThread != null )
+                {
+                    refreshThread.stopRefreshing();
+                }
+    
+                connection.unBind();
+                LOG.info( "Unbound from the server {}", config.getRemoteHost() );
+    
+                connection.close();
+                LOG.info( "Connection closed for the server {}", config.getRemoteHost() );
+    
+                connection = null;
+            }
+            catch ( Exception e )
+            {
+                LOG.error( "Failed to close the connection", e );
+            }
+            finally
+            {
+                // persist the cookie
+                storeCookie();
+                
+                // reset the cookie
+                syncCookie = null;
+            }
         }
     }
 



Mime
View raw message