directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trus...@apache.org
Subject svn commit: r476463 - in /directory/trunks/apacheds/mitosis/src: main/java/org/apache/directory/mitosis/configuration/ main/java/org/apache/directory/mitosis/service/ main/java/org/apache/directory/mitosis/service/protocol/handler/ test/java/org/apache...
Date Sat, 18 Nov 2006 07:52:13 GMT
Author: trustin
Date: Fri Nov 17 23:52:12 2006
New Revision: 476463

URL: http://svn.apache.org/viewvc?view=rev&rev=476463
Log:
Related issue: https://issues.apache.org/jira/browse/DIRSERVER-781 (Manual synchronous replication)
* Added ReplicationConfiguration.replicationInterval property that allows disabling automatic
replication
* Added ReplicationService.replicate() which forces immediate replication
* Changed ReplicationServiceIntegrationTest to use this new feature.

Modified:
    directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/configuration/ReplicationConfiguration.java
    directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java
    directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationContext.java
    directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationService.java
    directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/SimpleReplicationContext.java
    directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java
    directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java
    directory/trunks/apacheds/mitosis/src/test/java/org/apache/directory/mitosis/service/ReplicationServiceIntegrationTest.java

Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/configuration/ReplicationConfiguration.java
URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/configuration/ReplicationConfiguration.java?view=diff&rev=476463&r1=476462&r2=476463
==============================================================================
--- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/configuration/ReplicationConfiguration.java
(original)
+++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/configuration/ReplicationConfiguration.java
Fri Nov 17 23:52:12 2006
@@ -51,6 +51,7 @@
     private ReplicaId replicaId;
     private int serverPort = 7846;
     private int responseTimeout = 60;
+    private int replicationInterval = 5;
 
     private final Set<Replica> peerReplicas = new HashSet<Replica>();
     
@@ -88,8 +89,31 @@
         this.responseTimeout = responseTimeout;
     }
 
+    /**
+     * Returns the replication data exchange interval between two replicas in seconds.
+     * The default value is <tt>5</tt> seconds.
+     * 
+     * @return <tt>0</tt> if automatic replication is disabled
+     */
+    public int getReplicationInterval() {
+		return replicationInterval;
+	}
+
+
+    /**
+     * Sets the replication data exchange interval between two replicas in seconds.
+     * 
+     * @param replicationInterval <tt>0</tt> or below to disable automatic replication.
+     */
+	public void setReplicationInterval(int replicationInterval) {
+		if( replicationInterval < 0 ) {
+			replicationInterval = 0;
+		}
+		this.replicationInterval = replicationInterval;
+	}
 
-    public CSNFactory getCsnFactory()
+
+	public CSNFactory getCsnFactory()
     {
         return csnFactory;
     }

Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java
URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java?view=diff&rev=476463&r1=476462&r2=476463
==============================================================================
--- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java
(original)
+++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java
Fri Nov 17 23:52:12 2006
@@ -29,6 +29,7 @@
 import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
 import org.apache.directory.mitosis.service.protocol.codec.ReplicationClientProtocolCodecFactory;
 import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientProtocolHandler;
+import org.apache.directory.mitosis.service.protocol.handler.ReplicationProtocolHandler;
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.ExecutorThreadModel;
 import org.apache.mina.common.IoConnector;
@@ -101,6 +102,26 @@
         connector.getFilterChain().clear();
 
         ( ( ExecutorService ) ( ( ExecutorThreadModel ) connectorConfig.getThreadModel()
).getExecutor() ).shutdown();
+        
+        // Remove all status values.
+        sessions.clear();
+    }
+    
+    public void replicate()
+    {
+        // FIXME Can get ConcurrentModificationException.
+        for( Iterator i = sessions.values().iterator(); i.hasNext(); )
+        {
+            Connection con = ( Connection ) i.next();
+            synchronized( con )
+            {
+                // Begin replication for the connected replicas.
+                if ( con.session != null )
+                {
+                    ( ( ReplicationProtocolHandler ) con.session.getHandler() ).getContext(
con.session ).replicate();
+                }
+            }
+        }
     }
 
     private class ConnectionMonitor extends Thread
@@ -111,6 +132,19 @@
         public ConnectionMonitor()
         {
             super( "ClientConnectionManager" );
+            
+            // Initialize the status map.
+            Iterator i = configuration.getPeerReplicas().iterator();
+            while ( i.hasNext() )
+            {
+                Replica replica = ( Replica ) i.next();
+                Connection con = ( Connection ) sessions.get( replica.getId() );
+                if ( con == null )
+                {
+                    con = new Connection();
+                    sessions.put( replica.getId(), con );
+                }
+            }
         }
 
 
@@ -155,6 +189,8 @@
             Iterator i = configuration.getPeerReplicas().iterator();
             while ( i.hasNext() )
             {
+                // Someone might have modified the configuration,
+                // and therefore we try to detect newly added replicas.
                 Replica replica = ( Replica ) i.next();
                 Connection con = ( Connection ) sessions.get( replica.getId() );
                 if ( con == null )

Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationContext.java
URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationContext.java?view=diff&rev=476463&r1=476462&r2=476463
==============================================================================
--- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationContext.java
(original)
+++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationContext.java
Fri Nov 17 23:52:12 2006
@@ -65,6 +65,17 @@
 
 
     int getScheduledExpirations();
+    
+    
+    /**
+     * Forces this context to send replication data to the peer replica immediately.
+     * 
+     * @return <tt>true</tt> if the replication has been started,
+     *         <tt>false</tt> if the replication didn't start because
+     *         the replication process is already in progress or
+     *         the client is currently logging in to the server yet.
+     */
+    boolean replicate();
 
     public static class State
     {

Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationService.java
URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationService.java?view=diff&rev=476463&r1=476462&r2=476463
==============================================================================
--- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationService.java
(original)
+++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationService.java
Fri Nov 17 23:52:12 2006
@@ -185,6 +185,16 @@
         }
         registry.unbindAll();
     }
+    
+    
+    /**
+     * Forces this context to send replication data to the peer replica immediately.
+     */
+    public void replicate()
+    {
+        log.info( "Forcing replication..." );
+        this.clientConnectionManager.replicate();
+    }
 
 
     public void purgeAgedData() throws NamingException

Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/SimpleReplicationContext.java
URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/SimpleReplicationContext.java?view=diff&rev=476463&r1=476462&r2=476463
==============================================================================
--- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/SimpleReplicationContext.java
(original)
+++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/SimpleReplicationContext.java
Fri Nov 17 23:52:12 2006
@@ -26,12 +26,16 @@
 import java.util.Timer;
 import java.util.TimerTask;
 
-import org.apache.directory.server.core.DirectoryServiceConfiguration;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.util.SessionLog;
 import org.apache.directory.mitosis.common.Replica;
 import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
+import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientContextHandler;
+import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientProtocolHandler;
+import org.apache.directory.mitosis.service.protocol.handler.ReplicationContextHandler;
+import org.apache.directory.mitosis.service.protocol.handler.ReplicationProtocolHandler;
 import org.apache.directory.mitosis.service.protocol.message.BaseMessage;
+import org.apache.directory.server.core.DirectoryServiceConfiguration;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.SessionLog;
 
 
 public class SimpleReplicationContext implements ReplicationContext
@@ -136,6 +140,20 @@
 
         task.cancel();
         return task.message;
+    }
+    
+    public boolean replicate()
+    {
+        ReplicationProtocolHandler handler =
+            ( ReplicationProtocolHandler ) this.session.getHandler();
+        if( !( handler instanceof ReplicationClientProtocolHandler ) )
+        {
+            throw new UnsupportedOperationException(
+                    "Only clients can begin replication." );
+        }
+        
+        ReplicationContextHandler contextHandler = ( ( ReplicationProtocolHandler ) handler
).getContextHandler();
+        return ( ( ReplicationClientContextHandler ) contextHandler ).beginReplication( this
);
     }
 
 

Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java
URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java?view=diff&rev=476463&r1=476462&r2=476463
==============================================================================
--- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java
(original)
+++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java
Fri Nov 17 23:52:12 2006
@@ -76,8 +76,10 @@
         // Set write timeout
         ctx.getSession().setWriteTimeout( ctx.getConfiguration().getResponseTimeout() );
 
-        // Check update vector of the remote peer every 5 seconds.
-        ctx.getSession().setIdleTime( IdleStatus.BOTH_IDLE, 5 );
+        // Check update vector of the remote peer periodically.
+        ctx.getSession().setIdleTime(
+        		IdleStatus.BOTH_IDLE,
+        		ctx.getConfiguration().getReplicationInterval());
     }
 
 
@@ -141,15 +143,7 @@
 
     public void contextIdle( ReplicationContext ctx, IdleStatus status ) throws Exception
     {
-        // If this cilent is logged in, all responses for sent messages
-        // (LogEntryMessages) is received, and no write request is pending,
-        // it means previous replication process ended or this is the
-        // first replication attempt.
-        if ( ctx.getState() == State.READY && ctx.getScheduledExpirations() == 0
-            && ctx.getSession().getScheduledWriteRequests() == 0 )
-        {
-            beginReplication( ctx );
-        }
+        beginReplication( ctx );
     }
 
 
@@ -173,8 +167,6 @@
                 {
                     ctx.setPeer( replica );
                     ctx.setState( State.READY );
-
-                    beginReplication( ctx );
                     return;
                 }
                 else
@@ -192,10 +184,23 @@
     }
 
 
-    private void beginReplication( ReplicationContext ctx )
+    public boolean beginReplication( ReplicationContext ctx )
     {
-        // Initiate replication process asking update vector.
-        ctx.getSession().write( new BeginLogEntriesMessage( ctx.getNextSequence() ) );
+        // If this cilent is logged in, all responses for sent messages
+        // (LogEntryMessages) is received, and no write request is pending,
+        // it means previous replication process ended or this is the
+        // first replication attempt.
+        if ( ctx.getState() == State.READY && ctx.getScheduledExpirations() == 0
+            && ctx.getSession().getScheduledWriteRequests() == 0 )
+        {
+        	// Initiate replication process asking update vector.
+        	ctx.getSession().write( new BeginLogEntriesMessage( ctx.getNextSequence() ) );
+        	return true;
+        }
+        else
+        {
+        	return false;
+        }
     }
 
 

Modified: directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java
URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java?view=diff&rev=476463&r1=476462&r2=476463
==============================================================================
--- directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java
(original)
+++ directory/trunks/apacheds/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationProtocolHandler.java
Fri Nov 17 23:52:12 2006
@@ -52,9 +52,13 @@
     }
 
 
-    private ReplicationContext getContext( IoSession session )
+    public ReplicationContext getContext( IoSession session )
     {
         return ( ReplicationContext ) session.getAttribute( CONTEXT );
+    }
+    
+    public ReplicationContextHandler getContextHandler() {
+    	return contextHandler;
     }
 
 

Modified: directory/trunks/apacheds/mitosis/src/test/java/org/apache/directory/mitosis/service/ReplicationServiceIntegrationTest.java
URL: http://svn.apache.org/viewvc/directory/trunks/apacheds/mitosis/src/test/java/org/apache/directory/mitosis/service/ReplicationServiceIntegrationTest.java?view=diff&rev=476463&r1=476462&r2=476463
==============================================================================
--- directory/trunks/apacheds/mitosis/src/test/java/org/apache/directory/mitosis/service/ReplicationServiceIntegrationTest.java
(original)
+++ directory/trunks/apacheds/mitosis/src/test/java/org/apache/directory/mitosis/service/ReplicationServiceIntegrationTest.java
Fri Nov 17 23:52:12 2006
@@ -62,6 +62,7 @@
     private final Logger log = LoggerFactory.getLogger( ReplicationServiceIntegrationTest.class
);
 
     private Map contexts = new HashMap();
+    private Map replicationServices = new HashMap();
 
     protected void setUp() throws Exception
     {
@@ -82,7 +83,7 @@
         entry.put( "objectClass", "top" );
         ctxA.bind( "cn=test,ou=system", entry );
 
-        Thread.sleep( 5000 );
+        replicate( "A" );
 
         LdapContext ctxB = getReplicaContext( "B" );
         Assert.assertNotNull( ctxB.lookup( "cn=test,ou=system" ) );
@@ -128,6 +129,8 @@
 
             ReplicationConfiguration replicationCfg = new ReplicationConfiguration();
             replicationCfg.setReplicaId( replica.getId() );
+            // Disable automatic replication to prevent unexpected behavior
+            replicationCfg.setReplicationInterval(0);
             replicationCfg.setServerPort( replica.getAddress().getPort() );
             for( int j = 0; j < replicas.length; j++ )
             {
@@ -164,6 +167,7 @@
             // Initialize the server instance.
             LdapContext context = new InitialLdapContext( env, null );
             contexts.put( replicaId, context );
+            replicationServices.put( replicaId, replicationService );
         }
     }
 
@@ -176,6 +180,14 @@
         }
 
         return context;
+    }
+    
+    private void replicate( String name ) throws Exception
+    {
+        Thread.sleep( 2000 );
+        ReplicationService service = ( ReplicationService ) replicationServices.get( name
);
+        service.replicate();
+        Thread.sleep( 2000 );
     }
 
     private void destroyAllReplicas() throws Exception



Mime
View raw message