directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kayyag...@apache.org
Subject svn commit: r943893 - in /directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap: LdapServer.java replication/SyncReplConsumer.java replication/SyncreplConfiguration.java
Date Thu, 13 May 2010 14:13:20 GMT
Author: kayyagari
Date: Thu May 13 14:13:19 2010
New Revision: 943893

URL: http://svn.apache.org/viewvc?rev=943893&view=rev
Log:
o integrated syncrepl consumer's start/stop cycle with LdapServer
o added a cookie saver thread to syncrepl consumer implementation

Modified:
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplConsumer.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java?rev=943893&r1=943892&r2=943893&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/LdapServer.java
Thu May 13 14:13:19 2010
@@ -57,6 +57,8 @@ import org.apache.directory.server.ldap.
 import org.apache.directory.server.ldap.handlers.ssl.LdapsInitializer;
 import org.apache.directory.server.ldap.replication.ReplicationProvider;
 import org.apache.directory.server.ldap.replication.ReplicationSystem;
+import org.apache.directory.server.ldap.replication.SyncReplConsumer;
+import org.apache.directory.server.ldap.replication.SyncreplConfiguration;
 import org.apache.directory.server.protocol.shared.DirectoryBackedService;
 import org.apache.directory.server.protocol.shared.transport.TcpTransport;
 import org.apache.directory.server.protocol.shared.transport.Transport;
@@ -225,6 +227,10 @@ public class LdapServer extends Director
     
     private ReplicationProvider replicationProvider;
     
+    private List<SyncreplConfiguration> providerConfigs;
+    
+    private List<SyncReplConsumer> replConsumers;
+    
     /**
      * Creates an LDAP protocol provider.
      */
@@ -445,16 +451,18 @@ public class LdapServer extends Director
              * access to the DirectoryServer instance.
              */ 
             installDefaultHandlers();      
-
-            if( replicationProvider != null )
-            {
-                replicationProvider.init( this );
-                ( ( SearchHandler ) getSearchHandler() ).setReplicationProvider( replicationProvider
);
-            }
             
             startNetwork( transport, chain );
         }
         
+        if( replicationProvider != null )
+        {
+            replicationProvider.init( this );
+            ( ( SearchHandler ) getSearchHandler() ).setReplicationProvider( replicationProvider
);
+        }
+        
+        startConsumers();
+
         started = true;
         
         LOG.info( "Ldap service started." );
@@ -527,6 +535,8 @@ public class LdapServer extends Director
                     replicationProvider.stop();
                 }
             }
+            
+            stopConsumers();
         }
         catch ( Exception e )
         {
@@ -601,6 +611,62 @@ public class LdapServer extends Director
     }
 
 
+    /**
+     * starts the replication consumers
+     */
+    private void startConsumers() throws Exception
+    {
+        if( providerConfigs != null )
+        {
+            replConsumers = new ArrayList<SyncReplConsumer>( providerConfigs.size()
);
+            
+            for( final SyncreplConfiguration config : providerConfigs )
+            {
+                Runnable consumerTask = new Runnable()
+                {
+                    public void run()
+                    {
+                        try
+                        {
+                            SyncReplConsumer consumer = new SyncReplConsumer();
+                            LOG.info( "starting the replication consumer with config {}",
config );
+                            consumer.init( getDirectoryService(), config );
+                            consumer.connect();
+                            replConsumers.add( consumer );
+                            consumer.startSync();
+                        }
+                        catch( Exception e )
+                        {
+                            LOG.error( "Failed to start the consumer with config {}", config
);
+                            throw new RuntimeException( e );
+                        }
+                    }
+                };
+                
+                Thread consumerThread = new Thread( consumerTask );
+                consumerThread.setDaemon( true );
+                consumerThread.start();
+            }
+        }
+    }
+
+
+    /**
+     * stops the replication consumers
+     */
+    private void stopConsumers()
+    {
+        if( replConsumers != null )
+        {
+            for( SyncReplConsumer consumer : replConsumers )
+            {
+                LOG.info( "stopping the consumer with id {}", consumer.getConfig().getReplicaId()
);
+                consumer.disconnet();
+            }
+        }
+    }
+    
+    
     public String getName()
     {
         return SERVICE_NAME;
@@ -1300,6 +1366,12 @@ public class LdapServer extends Director
     }
 
 
+    public void setReplProviderConfigs( List<SyncreplConfiguration> providerConfigs
)
+    {
+        this.providerConfigs = providerConfigs;
+    }
+
+
     /**
      * @see Object#toString()
      */

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplConsumer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplConsumer.java?rev=943893&r1=943892&r2=943893&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplConsumer.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplConsumer.java
Thu May 13 14:13:19 2010
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -141,6 +142,8 @@ public class SyncReplConsumer
 
     private RefresherThread refreshThread;
 
+    /** the cookie that was saved last time */
+    private byte[] lastSavedCookie;
 
     /**
      * @return the config
@@ -151,17 +154,9 @@ public class SyncReplConsumer
     }
 
 
-    /**
-     * @param config the config to set
-     */
-    public void setConfig( SyncreplConfiguration config )
+    public void init( DirectoryService directoryservice, SyncreplConfiguration config ) throws
Exception
     {
         this.config = config;
-    }
-
-
-    public void init( DirectoryService directoryservice ) throws Exception
-    {
         this.directoryService = directoryservice;
 
         File cookieDir = new File( directoryservice.getWorkingDirectory(), "cookies" );
@@ -172,6 +167,8 @@ public class SyncReplConsumer
         session = directoryService.getAdminSession();
 
         schemaManager = directoryservice.getSchemaManager();
+        
+        prepareSyncSearchRequest();
     }
 
 
@@ -235,8 +232,7 @@ public class SyncReplConsumer
         searchRequest.setSizeLimit( config.getSearchSizeLimit() );
         searchRequest.setTimeLimit( config.getSearchTimeout() );
 
-        // the only valid values are NEVER_DEREF_ALIASES and DEREF_FINDING_BASE_OBJ
-        searchRequest.setDerefAliases( AliasDerefMode.NEVER_DEREF_ALIASES );
+        searchRequest.setDerefAliases( config.getAliasDerefMode() );
         searchRequest.setScope( config.getSearchScope() );
         searchRequest.setTypesOnly( false );
 
@@ -485,6 +481,7 @@ public class SyncReplConsumer
     {
         // read the cookie if persisted
         readCookie();
+        startCookieUpdater( config.getRefreshInterval() );
 
         if ( config.isRefreshPersist() )
         {
@@ -585,6 +582,7 @@ public class SyncReplConsumer
 
 
     /**
+     * FIXME store it in DiT
      * stores the cookie in a file.
      */
     private void storeCookie()
@@ -594,6 +592,14 @@ public class SyncReplConsumer
             return;
         }
 
+        if( lastSavedCookie != null )
+        {
+            if( Arrays.equals( syncCookie, lastSavedCookie ) )
+            {
+                return;
+            }
+        }
+        
         try
         {
             FileOutputStream fout = new FileOutputStream( cookieFile );
@@ -601,6 +607,9 @@ public class SyncReplConsumer
             fout.write( syncCookie );
             fout.close();
 
+            lastSavedCookie = new byte[ syncCookie.length ];
+            System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length );
+            
             LOG.debug( "stored the cookie" );
         }
         catch ( Exception e )
@@ -624,6 +633,9 @@ public class SyncReplConsumer
                 fin.read( syncCookie );
                 fin.close();
 
+                lastSavedCookie = new byte[ syncCookie.length ];
+                System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length );
+                
                 LOG.debug( "read the cookie from file: " + StringTools.utf8ToString( syncCookie
) );
             }
         }
@@ -878,4 +890,32 @@ public class SyncReplConsumer
         }
     }
 
+    
+    private void startCookieUpdater( final long delayMillis )
+    {
+        Runnable cookieSaver = new Runnable()
+        {
+            public void run()
+            {
+                while( !disconnected )
+                {
+                    storeCookie();
+                    try
+                    {
+                        Thread.sleep( delayMillis );
+                    }
+                    catch( Exception e )
+                    {
+                        LOG.warn( "Cookie saver task encountered an exception while sleeping",
e );
+                    }
+                }
+            }
+        };
+        
+        Thread cookieSaverThread = new Thread( cookieSaver );
+        cookieSaverThread.setDaemon( true );
+        
+        LOG.debug( "starting the cookie saver thread" );
+        cookieSaverThread.start();
+    }
 }

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java?rev=943893&r1=943892&r2=943893&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncreplConfiguration.java
Thu May 13 14:13:19 2010
@@ -50,8 +50,8 @@ public class SyncreplConfiguration
     /** flag to represent refresh and persist or refresh only mode, defaults to true */
     private boolean refreshPersist = true;
 
-    /** time interval for successive sync requests, default is 5 seconds */
-    private long refreshInterval = 5 * 1000;
+    /** time interval for successive sync requests, default is 60 seconds */
+    private long refreshInterval = 60 * 1000;
 
     /** the base DN whose content will be searched for replicating */
     private String baseDn;
@@ -82,6 +82,8 @@ public class SyncreplConfiguration
     /** the replica's id */
     private int replicaId;
 
+    
+    private static final String REPL_CONFIG_AREA = "ou=replProviders,ou=config";
 
     /**
      * @return the providerHost
@@ -196,6 +198,10 @@ public class SyncreplConfiguration
      */
     public void setRefreshInterval( long refreshInterval )
     {
+        if( refreshInterval <= 0 )
+        {
+            throw new IllegalArgumentException( "refresh interval should be more than zero"
);
+        }
         this.refreshInterval = refreshInterval;
     }
 
@@ -354,4 +360,9 @@ public class SyncreplConfiguration
         this.cookie = cookie;
     }
     
+    
+    public String getConfigEntryDn()
+    {
+        return "ads-dsReplicaId=" + replicaId + "," + REPL_CONFIG_AREA; 
+    }
 }



Mime
View raw message