directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From elecha...@apache.org
Subject svn commit: r1416619 - in /directory/apacheds/trunk: protocol-ldap/src/main/java/org/apache/directory/server/ldap/ protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ server-integ/src/test/java/org/apache/directory/server...
Date Mon, 03 Dec 2012 18:24:53 GMT
Author: elecharny
Date: Mon Dec  3 18:24:52 2012
New Revision: 1416619

URL: http://svn.apache.org/viewvc?rev=1416619&view=rev
Log:
Refactored the way we were handling threads in replication. 

Added:
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationStatusEnum.java
  (with props)
Modified:
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumer.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java
    directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java
    directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java?rev=1416619&r1=1416618&r2=1416619&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java
Mon Dec  3 18:24:52 2012
@@ -67,6 +67,7 @@ import org.apache.directory.server.ldap.
 import org.apache.directory.server.ldap.handlers.response.SearchResultReferenceHandler;
 import org.apache.directory.server.ldap.handlers.ssl.LdapsInitializer;
 import org.apache.directory.server.ldap.replication.consumer.ReplicationConsumer;
+import org.apache.directory.server.ldap.replication.consumer.ReplicationStatusEnum;
 import org.apache.directory.server.ldap.replication.provider.ReplicationRequestHandler;
 import org.apache.directory.server.protocol.shared.DirectoryBackedService;
 import org.apache.directory.server.protocol.shared.transport.TcpTransport;
@@ -687,16 +688,46 @@ public class LdapServer extends Director
         {
             for ( final ReplicationConsumer consumer : replConsumers )
             {
+                consumer.init( getDirectoryService() );
+
                 Runnable consumerTask = new Runnable()
                 {
                     public void run()
                     {
                         try
                         {
-                            LOG.info( "starting the replication consumer with {}", consumer
);
-                            CONSUMER_LOG.info( "starting the replication consumer with {}",
consumer );
-                            consumer.init( getDirectoryService() );
-                            consumer.start( true );
+                            boolean stopped = false;
+                            
+                            while ( !stopped )
+                            {
+                                LOG.info( "starting the replication consumer with {}", consumer
);
+                                CONSUMER_LOG.info( "starting the replication consumer with
{}", consumer );
+                                boolean isConnected = consumer.connect( ReplicationConsumer.NOW
);
+                                
+                                if ( isConnected )
+                                {
+                                    // We are now connected, start the replication
+                                    ReplicationStatusEnum status = null;
+                                    
+                                    do {
+                                        status = consumer.startSync();
+                                    } while ( status == ReplicationStatusEnum.REFRESH_REQUIRED
);
+                                    
+                                    switch ( status )
+                                    {
+                                        case STOPPED :
+                                            stopped = true;
+                                            break;
+                                            
+                                        case CANCELLED :
+                                        case DISCONNECTED :
+                                        case INTERRUPTED :
+                                        case UNKOWN_ERROR :
+                                            // Loop on connect
+                                            break;
+                                    }
+                                }
+                            }
                         }
                         catch ( Exception e )
                         {

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumer.java?rev=1416619&r1=1416618&r2=1416619&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumer.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumer.java
Mon Dec  3 18:24:52 2012
@@ -34,6 +34,12 @@ import org.apache.directory.server.ldap.
  */
 public interface ReplicationConsumer
 {
+    /** A flag we used when we want to connect without waiting */
+    boolean NOW = true;
+    
+    /** A flag we used when we want to connect after a waiting delay */
+    boolean DIFFERED = false;
+    
     /**
      * Sets the configuration of the consumer
      * 
@@ -58,11 +64,12 @@ public interface ReplicationConsumer
 
 
     /**
-     * Starts the consumer, connection immediately or wait before reconnection
+     * Connect the consumer, connection immediately or wait before reconnection
      * 
      * @param now A param that tells the consumer to connect immediately or not
+     * @return true if the consumer is connected, false otherwise
      */
-    void start( boolean now );
+    boolean connect( boolean now );
 
 
     /**
@@ -75,4 +82,10 @@ public interface ReplicationConsumer
      * @return the identifier of the consumer instance
      */
     String getId();
+
+    
+    /**
+     * Starts the synchronization operation
+     */
+    ReplicationStatusEnum startSync();
 }

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java?rev=1416619&r1=1416618&r2=1416619&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java
Mon Dec  3 18:24:52 2012
@@ -46,6 +46,7 @@ import org.apache.directory.server.core.
 import org.apache.directory.server.ldap.LdapProtocolUtils;
 import org.apache.directory.server.ldap.replication.ReplicationConsumerConfig;
 import org.apache.directory.server.ldap.replication.SyncReplConfiguration;
+import org.apache.directory.shared.asn1.DecoderException;
 import org.apache.directory.shared.ldap.codec.controls.manageDsaIT.ManageDsaITDecorator;
 import org.apache.directory.shared.ldap.extras.controls.SyncDoneValue;
 import org.apache.directory.shared.ldap.extras.controls.SyncInfoValue;
@@ -128,6 +129,9 @@ public class ReplicationConsumerImpl imp
     /** flag to indicate whether the consumer was disconnected */
     private boolean disconnected;
 
+    /** A field used to tell the thread it should stop */
+    private volatile boolean stop = false;
+
     /** the core session */
     private CoreSession session;
 
@@ -141,9 +145,6 @@ public class ReplicationConsumerImpl imp
             SchemaConstants.COLLECTIVE_ATTRIBUTE_SUBENTRIES_AT
         };
 
-    /** A thread used to refresh in refreshOnly mode */
-    private RefresherThread refreshThread;
-
     /** the cookie that was saved last time */
     private byte[] lastSavedCookie;
 
@@ -232,7 +233,7 @@ public class ReplicationConsumerImpl imp
             // Try to connect
             if ( connection.connect() )
             {
-                CONSUMER_LOG.info( "Consumer {} connected to producer{}", config.getReplicaId(),
config.getProducer() );
+                CONSUMER_LOG.info( "Consumer {} connected to producer {}", config.getReplicaId(),
config.getProducer() );
 
                 // Do a bind
                 try
@@ -258,8 +259,8 @@ public class ReplicationConsumerImpl imp
         }
         catch ( Exception e )
         {
-            LOG.error( "Failed to connect to the server {}:{}", providerHost, String.valueOf(
port ) );
-            LOG.error( "", e );
+            CONSUMER_LOG.error( "Failed to connect to the server {}, cause : {}", config.getProducer(),
e.getMessage() );
+            LOG.error( "Failed to connect to the server {}, cause : {}", config.getProducer(),
e.getMessage() );
         }
 
         return false;
@@ -323,9 +324,18 @@ public class ReplicationConsumerImpl imp
     }
 
 
+    /**
+     * Process a SearchResultEntry received from a consumer. We have to handle all the 
+     * cases :
+     * - Add
+     * - Modify
+     * - Moddn
+     * - Delete
+     * - Present
+     * @param syncResult
+     */
     private void handleSearchResultEntry( SearchResultEntry syncResult )
     {
-
         LOG.debug( "------------- starting handleSearchResult ------------" );
 
         SyncStateValue syncStateCtrl = ( SyncStateValue ) syncResult.getControl( SyncStateValue.OID
);
@@ -337,6 +347,7 @@ public class ReplicationConsumerImpl imp
             // lock on UUID to serialize the updates when there are multiple consumers
             // connected to several producers and to the *same* base/partition
             Object lock = getLockFor( uuid );
+            
             synchronized ( lock )
             {
                 int rid = -1;
@@ -345,8 +356,7 @@ public class ReplicationConsumerImpl imp
                 {
                     syncCookie = syncStateCtrl.getCookie();
                     rid = LdapProtocolUtils.getReplicaId( Strings.utf8ToString( syncCookie
) );
-                    LOG.debug( "assigning the cookie from sync state value control: "
-                        + Strings.utf8ToString(syncCookie) );
+                    LOG.debug( "assigning the cookie from sync state value control: {}",
Strings.utf8ToString(syncCookie) );
                 }
                 
                 SyncStateTypeEnum state = syncStateCtrl.getSyncStateType();
@@ -361,78 +371,79 @@ public class ReplicationConsumerImpl imp
 
                 Dn remoteDn = remoteEntry.getDn();
 
-            switch ( state )
-            {
-                case ADD:
-                    boolean remoteDnExist = false;
-                    
-                    try
-                    {
-                        remoteDnExist = session.exists( remoteDn );
-                    }
-                    catch ( LdapNoSuchObjectException lnsoe )
-                    {
-                        CONSUMER_LOG.error( lnsoe.getMessage() );
-                    }
-                    
-                    if ( !remoteDnExist)
-                    {
-                        LOG.debug( "adding entry with dn {}", remoteDn );
-                        LOG.debug( remoteEntry.toString() );
-                        AddOperationContext addContext = new AddOperationContext( session,
remoteEntry );
-                        addContext.setReplEvent( true );
-                        addContext.setRid( rid );
+                switch ( state )
+                {
+                    case ADD:
+                        boolean remoteDnExist = false;
+                        
+                        try
+                        {
+                            remoteDnExist = session.exists( remoteDn );
+                        }
+                        catch ( LdapNoSuchObjectException lnsoe )
+                        {
+                            CONSUMER_LOG.error( lnsoe.getMessage() );
+                        }
                         
-                        OperationManager operationManager = directoryService.getOperationManager();
-                        operationManager.add( addContext );
-                    }
-                    else
-                    {
-                        LOG.debug( "updating entry in refreshOnly mode {}", remoteDn );
+                        if ( !remoteDnExist)
+                        {
+                            LOG.debug( "adding entry with dn {}", remoteDn );
+                            LOG.debug( remoteEntry.toString() );
+                            AddOperationContext addContext = new AddOperationContext( session,
remoteEntry );
+                            addContext.setReplEvent( true );
+                            addContext.setRid( rid );
+                            
+                            OperationManager operationManager = directoryService.getOperationManager();
+                            operationManager.add( addContext );
+                        }
+                        else
+                        {
+                            LOG.debug( "updating entry in refreshOnly mode {}", remoteDn
);
+                            modify( remoteEntry, rid );
+                        }
+    
+                        break;
+    
+                    case MODIFY:
+                        LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName()
);
                         modify( remoteEntry, rid );
-                    }
-
-                    break;
-
-                case MODIFY:
-                    LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName()
);
-                    modify( remoteEntry, rid );
-
-                    break;
-
-                case MODDN:
-                    String entryUuid = Strings.uuidToString( syncStateCtrl.getEntryUUID()
).toString();
-                    applyModDnOperation( remoteEntry, entryUuid, rid );
-
-                    break;
-
-                case DELETE:
-                    LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName()
);
-                    if ( !session.exists( remoteDn ) )
-                    {
-                        LOG.debug( "looks like entry {} was already deleted in a prior update
(possibly from another provider), skipping delete", remoteDn );
-                    }
-                    else
-                    {
-                        // incase of a MODDN operation resulting in a branch to be moved
out of scope
-                        // ApacheDS replication provider sends a single delete event on the
Dn of the moved branch
-                        // so the branch needs to be recursively deleted here
-                        deleteRecursive( remoteEntry.getDn(), null );
-                    }
-
-                    break;
-
-                case PRESENT:
-                    LOG.debug( "entry present {}", remoteEntry );
-                    break;
-            }
+    
+                        break;
+    
+                    case MODDN:
+                        String entryUuid = Strings.uuidToString( syncStateCtrl.getEntryUUID()
).toString();
+                        applyModDnOperation( remoteEntry, entryUuid, rid );
+    
+                        break;
+    
+                    case DELETE:
+                        LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName()
);
+                        
+                        if ( !session.exists( remoteDn ) )
+                        {
+                            LOG.debug( "looks like entry {} was already deleted in a prior
update (possibly from another provider), skipping delete", remoteDn );
+                        }
+                        else
+                        {
+                            // incase of a MODDN operation resulting in a branch to be moved
out of scope
+                            // ApacheDS replication provider sends a single delete event
on the Dn of the moved branch
+                            // so the branch needs to be recursively deleted here
+                            deleteRecursive( remoteEntry.getDn(), null );
+                        }
+    
+                        break;
+    
+                    case PRESENT:
+                        LOG.debug( "entry present {}", remoteEntry );
+                        break;
+                }
 
-            // store the cookie only if the above operation was successful
-            if ( syncStateCtrl.getCookie() != null )
-            {
-                storeCookie();
+                // store the cookie only if the above operation was successful
+                if ( syncStateCtrl.getCookie() != null )
+                {
+                    storeCookie();
+                }
             }
-          }
         }
         catch ( Exception e )
         {
@@ -464,6 +475,12 @@ public class ReplicationConsumerImpl imp
 
             byte[] cookie = syncInfoValue.getCookie();
 
+            
+            if ( CONSUMER_LOG.isDebugEnabled() )
+            {
+                CONSUMER_LOG.debug( "Received a SyncInfoValue from producer {} : {}", config.getProducer(),
syncInfoValue );
+            }
+
             int replicaId = -1;
             
             if ( cookie != null )
@@ -498,6 +515,7 @@ public class ReplicationConsumerImpl imp
         catch ( Exception de )
         {
             LOG.error( "Failed to handle syncinfo message", de );
+            CONSUMER_LOG.error( "Failed to handle syncinfo message", de );
         }
 
         LOG.debug( ".................... END handleSyncInfo ..............." );
@@ -511,46 +529,71 @@ public class ReplicationConsumerImpl imp
     {
         CONSUMER_LOG.debug( "Consumer {} session with {} has been closed ", config.getReplicaId(),
config.getProducer() );
         
-        if ( disconnected )
-        {
-            return;
-        }
-
-        start( false );
+        return;
     }
 
 
     /**
      * Starts the synchronization operation
      */
-    private void startSync()
+    public ReplicationStatusEnum startSync()
     {
         CONSUMER_LOG.debug( "Starting the SyncRepl process for consumer {}", config.getReplicaId()
);
 
         // read the cookie if persisted
         readCookie();
 
-        CONSUMER_LOG.debug( "Cookie read : '{}'", syncCookie );
-
         if ( config.isRefreshNPersist() )
         {
             try
             {
                 LOG.debug( "==================== Refresh And Persist ==========" );
-                doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST, false );
+
+                return doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST, false );
             }
             catch ( Exception e )
             {
                 LOG.error( "Failed to sync with refreshAndPersist mode", e );
+                return ReplicationStatusEnum.UNKOWN_ERROR;
             }
         }
         else
         {
-            refreshThread = new RefresherThread();
-            refreshThread.start();
+            return doRefreshOnly();
         }
     }
+    
+    
+    private ReplicationStatusEnum doRefreshOnly()
+    {
+        while ( !stop )
+        {
+            LOG.debug( "==================== Refresh Only ==========" );
+
+            try
+            {
+                doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, false );
 
+                LOG.info( "--------------------- Sleep for a little while ------------------"
);
+                Thread.sleep( config.getRefreshInterval() );
+                LOG.debug( "--------------------- syncing again ------------------" );
+
+            }
+            catch ( InterruptedException ie )
+            {
+                LOG.warn( "refresher thread interrupted" );
+                return ReplicationStatusEnum.INTERRUPTED;
+            }
+            catch ( Exception e )
+            {
+                LOG.error( "Failed to sync with refresh only mode", e );
+                return ReplicationStatusEnum.UNKOWN_ERROR;
+            }
+        }
+
+        return ReplicationStatusEnum.STOPPED;
+    }
+    
 
     /** 
      * {@inheritDoc}
@@ -564,7 +607,7 @@ public class ReplicationConsumerImpl imp
     /**
      * {@inheritDoc}
      */
-    public void start( boolean now )
+    public boolean connect( boolean now )
     {
         boolean connected = false;
 
@@ -591,8 +634,11 @@ public class ReplicationConsumerImpl imp
 
             connected = connect();
         }
-
-        startSync();
+        
+        // TODO : we may have cases were we get here with the connected flag to false. With
the above
+        // code, thi sis not possible
+        
+        return connected;
     }
 
 
@@ -612,8 +658,8 @@ public class ReplicationConsumerImpl imp
     {
         return String.valueOf( getConfig().getReplicaId() );
     }
-
-
+    
+    
     /**
      * Performs a search on connection with updated syncRequest control. The provider
      * will initiate an UpdateContant or an initContent depending on the current consumer
@@ -626,9 +672,10 @@ public class ReplicationConsumerImpl imp
      *
      * @param syncType The synchornization type, either REFRESH_ONLY or REFRESH_AND_PERSIST
      * @param reloadHint A flag used to tell the server that we want a reload
+     * @return The replication status
      * @throws Exception in case of any problems encountered while searching
      */
-    private void doSyncSearch( SynchronizationModeEnum syncType, boolean reloadHint ) throws
Exception
+    private ReplicationStatusEnum doSyncSearch( SynchronizationModeEnum syncType, boolean
reloadHint ) throws Exception
     {
         CONSUMER_LOG.debug( "Starting synchronization mode {}, reloadHint {}", syncType,
reloadHint );
         // Prepare the Syncrepl Request
@@ -685,12 +732,14 @@ public class ReplicationConsumerImpl imp
         {
             
             CONSUMER_LOG.debug( "Search sync on {} has been canceled ", config.getProducer(),
sf.getCause() );
-            return;
+            
+            return ReplicationStatusEnum.CANCELLED;
         }
         else if ( disconnected )
         {
             CONSUMER_LOG.debug( "Disconnected from {}", config.getProducer() );
-            return;
+            
+            return ReplicationStatusEnum.DISCONNECTED;
         }
         else
         {
@@ -705,14 +754,11 @@ public class ReplicationConsumerImpl imp
                 CONSUMER_LOG.warn( "The base Dn {} is not found on provider {}", config.getBaseDn(),
config.getProducer() );
                 LOG.warn( "The base Dn {} is not found on provider {}", config.getBaseDn(),
config.getProducer() );
     
-                if ( syncType == SynchronizationModeEnum.REFRESH_AND_PERSIST )
-                {
-                    CONSUMER_LOG.warn( "Disconnecting the Refresh&Persist consumer from
provider {}", config.getProducer() );
-                    LOG.warn( "Disconnecting the Refresh&Persist consumer from provider
{}", config.getProducer() );
-                    disconnect();
-                    
-                    return;
-                }
+                CONSUMER_LOG.warn( "Disconnecting the Refresh&Persist consumer from provider
{}", config.getProducer() );
+                LOG.warn( "Disconnecting the Refresh&Persist consumer from provider {}",
config.getProducer() );
+                disconnect();
+                
+                return ReplicationStatusEnum.DISCONNECTED;
             }
             else if ( resultCode == ResultCodeEnum.E_SYNC_REFRESH_REQUIRED )
             {
@@ -736,12 +782,13 @@ public class ReplicationConsumerImpl imp
                 removeCookie();
                 
                 CONSUMER_LOG.debug( "Re-doing a syncRefresh from producer {}", config.getProducer()
);
-                // Remove this recursive call...
-                doSyncSearch( syncType, true );
+                
+                return ReplicationStatusEnum.REFRESH_REQUIRED;
             }
             else
             {
                 CONSUMER_LOG.debug( "Got result code {} from producer {}. Replication stopped",
resultCode, config.getProducer() );
+                return ReplicationStatusEnum.UNKOWN_ERROR;
             }
         }
     }
@@ -756,10 +803,7 @@ public class ReplicationConsumerImpl imp
 
         try
         {
-            if ( refreshThread != null )
-            {
-                refreshThread.stopRefreshing();
-            }
+            stopRefreshing();
 
             connection.unBind();
             LOG.info( "Unbound from the server {}", config.getProducer() );
@@ -1221,51 +1265,11 @@ public class ReplicationConsumerImpl imp
         cursor.close();
     }
 
-    /**
-     * A Thread implementation for synchronizing the DIT in refreshOnly mode
-     */
-    private class RefresherThread extends Thread
-    {
-        /** A field used to tell the thread it should stop */
-        private volatile boolean stop = false;
-        
-        public RefresherThread()
-        {
-            setDaemon( true );
-        }
-
-
-        public void run()
-        {
-            while ( !stop )
-            {
-                LOG.debug( "==================== Refresh Only ==========" );
-
-                try
-                {
-                    doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, false );
-
-                    LOG.info( "--------------------- Sleep for a little while ------------------"
);
-                    Thread.sleep( config.getRefreshInterval() );
-                    LOG.debug( "--------------------- syncing again ------------------" );
-
-                }
-                catch ( InterruptedException ie )
-                {
-                    LOG.warn( "refresher thread interrupted" );
-                }
-                catch ( Exception e )
-                {
-                    LOG.error( "Failed to sync with refresh only mode", e );
-                }
-            }
-        }
 
 
-        public void stopRefreshing()
-        {
-            stop = true;
-        }
+    public void stopRefreshing()
+    {
+        stop = true;
     }
     
     

Added: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationStatusEnum.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationStatusEnum.java?rev=1416619&view=auto
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationStatusEnum.java
(added)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationStatusEnum.java
Mon Dec  3 18:24:52 2012
@@ -0,0 +1,46 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.directory.server.ldap.replication.consumer;
+
+/**
+ * This enum is used to describe the various status of the replication.
+ * 
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ */
+public enum ReplicationStatusEnum
+{
+    /** We get disconnected from the provider */
+    DISCONNECTED,
+    
+    /** A full refresh should be done */
+    REFRESH_REQUIRED,
+    
+    /** The replication loop has been interrupted */
+    INTERRUPTED,
+    
+    /** The replication has been stopped */
+    STOPPED,
+    
+    /** The replication has been cancelled */
+    CANCELLED,
+    
+    /** We have got an unknown error */
+    UNKOWN_ERROR;
+}

Propchange: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationStatusEnum.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java?rev=1416619&r1=1416618&r2=1416619&view=diff
==============================================================================
--- directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java
(original)
+++ directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java
Mon Dec  3 18:24:52 2012
@@ -304,7 +304,8 @@ public class ClientInitialRefreshIT
                     DirectoryService directoryService = new MockDirectoryService();
                     directoryService.setSchemaManager( schemaManager );
                     ( ( MockSyncReplConsumer ) syncreplClient ).init( directoryService );
-                    syncreplClient.start( true );
+                    syncreplClient.connect( true );
+                    syncreplClient.startSync();
                 }
                 catch ( Exception e )
                 {
@@ -332,7 +333,8 @@ public class ClientInitialRefreshIT
             {
                 try
                 {
-                    consumer.start( true );
+                    consumer.connect( true );
+                    consumer.startSync();
                 }
                 catch ( Exception e )
                 {

Modified: directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java?rev=1416619&r1=1416618&r2=1416619&view=diff
==============================================================================
--- directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java
(original)
+++ directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java
Mon Dec  3 18:24:52 2012
@@ -41,6 +41,7 @@ import org.apache.directory.server.ldap.
 import org.apache.directory.server.ldap.replication.ReplicationConsumerConfig;
 import org.apache.directory.server.ldap.replication.SyncReplConfiguration;
 import org.apache.directory.server.ldap.replication.consumer.ReplicationConsumer;
+import org.apache.directory.server.ldap.replication.consumer.ReplicationStatusEnum;
 import org.apache.directory.shared.ldap.codec.api.LdapApiService;
 import org.apache.directory.shared.ldap.codec.api.LdapApiServiceFactory;
 import org.apache.directory.shared.ldap.extras.controls.SyncDoneValue;
@@ -102,6 +103,12 @@ public class MockSyncReplConsumer implem
 
     /** the syncrepl configuration */
     private SyncReplConfiguration config;
+    
+    /** A field used to tell the thread it should stop */
+    private volatile boolean stop = false;
+
+    /** A mutex used to make the thread sleeping for a moment */
+    private final Object mutex = new Object();
 
     /** the sync cookie sent by the server */
     private byte[] syncCookie;
@@ -460,7 +467,7 @@ public class MockSyncReplConsumer implem
     /**
      * starts the synchronization operation
      */
-    public void startSync()
+    public ReplicationStatusEnum startSync()
     {
         // read the cookie if persisted
         readCookie();
@@ -470,21 +477,52 @@ public class MockSyncReplConsumer implem
             try
             {
                 LOG.debug( "==================== Refresh And Persist ==========" );
-                doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST, false );
+
+                return doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST, false );
             }
             catch ( Exception e )
             {
                 LOG.error( "Failed to sync with refreshAndPersist mode", e );
+                return ReplicationStatusEnum.UNKOWN_ERROR;
             }
         }
         else
         {
-            refreshThread = new RefresherThread();
-            refreshThread.start();
+            return doRefreshOnly();
         }
     }
 
+    private ReplicationStatusEnum doRefreshOnly()
+    {
+        while ( !stop )
+        {
+            LOG.debug( "==================== Refresh Only ==========" );
+
+            try
+            {
+                doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, false );
+
+                LOG.info( "--------------------- Sleep for a little while ------------------"
);
+                mutex.wait( config.getRefreshInterval() );
+                LOG.debug( "--------------------- syncing again ------------------" );
+
+            }
+            catch ( InterruptedException ie )
+            {
+                LOG.warn( "refresher thread interrupted" );
+                return ReplicationStatusEnum.INTERRUPTED;
+            }
+            catch ( Exception e )
+            {
+                LOG.error( "Failed to sync with refresh only mode", e );
+                return ReplicationStatusEnum.UNKOWN_ERROR;
+            }
+        }
+
+        return ReplicationStatusEnum.STOPPED;
+    }
 
+    
     /**
      * {@inheritDoc}
      */
@@ -497,10 +535,35 @@ public class MockSyncReplConsumer implem
     /**
      * {@inheritDoc}
      */
-    public void start( boolean now )
+    public boolean connect( boolean now )
     {
-        connect();
-        startSync();
+        boolean connected = false;
+
+        if ( now )
+        {
+            connected = connect();
+        }
+        
+        while ( !connected )
+        {
+            try
+            {
+                // try to establish a connection for every 5 seconds
+                Thread.sleep( 5000 );
+            }
+            catch ( InterruptedException e )
+            {
+                LOG.warn( "Consumer {} Interrupted while trying to reconnect to the provider
{}",
+                    config.getReplicaId(), config.getProducer() );
+            }
+
+            connected = connect();
+        }
+        
+        // TODO : we may have cases were we get here with the connected flag to false. With
the above
+        // code, thi sis not possible
+        
+        return connected;
     }
 
 
@@ -528,7 +591,7 @@ public class MockSyncReplConsumer implem
      *
      * @throws Exception in case of any problems encountered while searching
      */
-    private void doSyncSearch( SynchronizationModeEnum syncType, boolean reloadHint ) throws
Exception
+    private ReplicationStatusEnum doSyncSearch( SynchronizationModeEnum syncType, boolean
reloadHint ) throws Exception
     {
         SyncRequestValue syncReq = new SyncRequestValueDecorator( ldapCodecService );
 
@@ -574,15 +637,16 @@ public class MockSyncReplConsumer implem
         {
             // log the error and handle it appropriately
             LOG.warn( "given replication base Dn {} is not found on provider", config.getBaseDn()
);
-            if ( syncType == SynchronizationModeEnum.REFRESH_AND_PERSIST )
-            {
-                LOG.warn( "disconnecting the consumer running in refreshAndPersist mode from
the provider" );
-                disconnect();
-            }
+            
+            LOG.warn( "disconnecting the consumer running in refreshAndPersist mode from
the provider" );
+            disconnect();
+            
+            return ReplicationStatusEnum.DISCONNECTED;
         }
         else if ( resultCode == ResultCodeEnum.E_SYNC_REFRESH_REQUIRED )
         {
             LOG.info( "unable to perform the content synchronization cause E_SYNC_REFRESH_REQUIRED"
);
+            
             try
             {
                 deleteRecursive( new Dn( config.getBaseDn() ), null );
@@ -597,7 +661,12 @@ public class MockSyncReplConsumer implem
             }
 
             removeCookie();
-            doSyncSearch( syncType, true );
+            
+            return ReplicationStatusEnum.REFRESH_REQUIRED;
+        } 
+        else
+        {
+            return ReplicationStatusEnum.UNKOWN_ERROR;
         }
     }
 
@@ -968,12 +1037,6 @@ public class MockSyncReplConsumer implem
      */
     private class RefresherThread extends Thread
     {
-        /** A field used to tell the thread it should stop */
-        private volatile boolean stop = false;
-
-        /** A mutex used to make the thread sleeping for a moment */
-        private final Object mutex = new Object();
-
 
         public RefresherThread()
         {



Mime
View raw message