directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From elecha...@apache.org
Subject svn commit: r1157271 - in /directory/apacheds/trunk: protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ protocol-ldap/src/test/java/org/apache/directory/server/ldap/ server-integ/src/test/java/org/apache/directory/server/replicat...
Date Fri, 12 Aug 2011 22:23:36 GMT
Author: elecharny
Date: Fri Aug 12 22:23:35 2011
New Revision: 1157271

URL: http://svn.apache.org/viewvc?rev=1157271&view=rev
Log:
Refactored the replication process :
o used a JdbmTable instead of ActiveMQ

Added:
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessageSerializer.java
      - copied, changed from r1156696, directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ModificationSerializer.java
Removed:
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/Modification.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ModificationSerializer.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLogCursor.java
Modified:
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplConsumerManager.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaJournalCursor.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplRequestHandler.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java
    directory/apacheds/trunk/protocol-ldap/src/test/java/org/apache/directory/server/ldap/JournalTest.java
    directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientServerReplicationIT.java

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplConsumerManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplConsumerManager.java?rev=1157271&r1=1157270&r2=1157271&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplConsumerManager.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplConsumerManager.java Fri Aug 12 22:23:35 2011
@@ -279,7 +279,7 @@ public class ReplConsumerManager
     private ReplicaEventLog convertEntryToReplica( Entry entry ) throws Exception
     {
         String id = entry.get( SchemaConstants.ADS_DS_REPLICA_ID ).getString();
-        ReplicaEventLog replica = new ReplicaEventLog( Integer.parseInt( id ) );
+        ReplicaEventLog replica = new ReplicaEventLog( directoryService, Integer.parseInt( id ) );
 
         NotificationCriteria searchCriteria = new NotificationCriteria();
 

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java?rev=1157271&r1=1157270&r2=1157271&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java Fri Aug 12 22:23:35 2011
@@ -21,15 +21,22 @@
 package org.apache.directory.server.ldap.replication;
 
 
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQMessageProducer;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.command.ActiveMQQueue;
+import java.io.File;
+import java.io.IOException;
+
+import jdbm.RecordManager;
+import jdbm.recman.BaseRecordManager;
+
+import org.apache.directory.server.core.DirectoryService;
 import org.apache.directory.server.core.event.EventType;
 import org.apache.directory.server.core.event.NotificationCriteria;
+import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable;
+import org.apache.directory.server.core.partition.impl.btree.jdbm.StringSerializer;
+import org.apache.directory.shared.ldap.model.constants.SchemaConstants;
+import org.apache.directory.shared.ldap.model.cursor.Cursor;
+import org.apache.directory.shared.ldap.model.cursor.Tuple;
+import org.apache.directory.shared.ldap.model.schema.SchemaManager;
+import org.apache.directory.shared.ldap.model.schema.comparators.SerializableComparator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,20 +84,14 @@ public class ReplicaEventLog implements 
     private boolean refreshNPersist;
 
     // fields that won't be serialized
-    /** the ActiveMQ session */
-    private ActiveMQSession amqSession;
-
-    /** the Queue used for storing messages */
-    private ActiveMQQueue queue;
+    /** The Journal */
+    private JdbmTable<String, ReplicaEventMessage> journal;
 
-    /** message producer for Queue */
-    private ActiveMQMessageProducer producer;
+    /** the underlying file  */
+    private File journalFile;
 
-    /** the messaging system's connection */
-    private ActiveMQConnection amqConnection;
-
-    /** ActiveMQ's BrokerService */
-    private BrokerService brokerService;
+    /** The record manager*/
+    private RecordManager recman;
 
     /** A flag used to indicate that the consumer is not up to date */
     private volatile boolean dirty;
@@ -100,31 +101,23 @@ public class ReplicaEventLog implements 
      * Creates a new instance of EventLog for a replica
      * @param replicaId The replica ID
      */
-    public ReplicaEventLog( int replicaId )
+    public ReplicaEventLog( DirectoryService directoryService, int replicaId ) throws IOException
     {
+        SchemaManager schemaManager = directoryService.getSchemaManager();
         this.replicaId = replicaId;
         this.searchCriteria = new NotificationCriteria();
         this.searchCriteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );
-    }
 
+        // Create the journal file, or open it of it exists
+        File logDir = directoryService.getInstanceLayout().getLogDirectory();
+        journalFile = new File( logDir, "journalRepl." + replicaId );
+        recman = new BaseRecordManager( journalFile.getAbsolutePath() );
 
-    /**
-     * Instantiates a message queue and corresponding producer for storing DIT changes.
-     *
-     * @param amqConnection ActiveMQ connection
-     * @param brokerService ActiveMQ's broker service
-     * @throws Exception If the queue can't be created
-     */
-    public void configure( final ActiveMQConnection amqConnection, final BrokerService brokerService ) throws Exception
-    {
-        if ( ( amqSession == null ) || !amqSession.isRunning() )
-        {
-            this.amqConnection = amqConnection;
-            amqSession = ( ActiveMQSession ) amqConnection.createSession( false, ActiveMQSession.AUTO_ACKNOWLEDGE );
-            queue = ( ActiveMQQueue ) amqSession.createQueue( getQueueName() );
-            producer = ( ActiveMQMessageProducer ) amqSession.createProducer( queue );
-            this.brokerService = brokerService;
-        }
+        SerializableComparator<String> comparator = new SerializableComparator<String>( SchemaConstants.CSN_ORDERING_MATCH_MR_OID );
+        comparator.setSchemaManager( schemaManager );
+        
+        journal = new JdbmTable<String, ReplicaEventMessage>( schemaManager, "replication", recman, comparator, 
+            new StringSerializer(), new ReplicaEventMessageSerializer( schemaManager ) );
     }
 
 
@@ -137,12 +130,12 @@ public class ReplicaEventLog implements 
     {
         try
         {
-            LOG.debug( "logging entry with Dn {} with the event {}", message.getEntry().getDn(), message.getEventType() );
-            
-            ActiveMQObjectMessage ObjectMessage = ( ActiveMQObjectMessage ) amqSession.createObjectMessage();
-            ObjectMessage.setObject( message );
-            
-            producer.send( ObjectMessage );
+            LOG.debug( "logging entry with Dn {} with the event {}", message.getEntry().getDn(), message.getChangeType() );
+
+            String entryCsn = message.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString();
+            journal.put( entryCsn, message );
+            journal.sync();
+            recman.commit();
         }
         catch ( Exception e )
         {
@@ -159,12 +152,6 @@ public class ReplicaEventLog implements 
      */
     public void truncate() throws Exception
     {
-        producer.close();
-
-        String queueName = queue.getQueueName();
-        LOG.debug( "deleting the queue {}", queueName );
-        amqConnection.destroyDestination( queue );
-        queue = null;
     }
 
 
@@ -175,8 +162,6 @@ public class ReplicaEventLog implements 
     public void recreate() throws Exception
     {
         LOG.debug( "recreating the queue for the replica id {}", replicaId );
-        queue = ( ActiveMQQueue ) amqSession.createQueue( getQueueName() );
-        producer = ( ActiveMQMessageProducer ) amqSession.createProducer( queue );
     }
 
 
@@ -188,8 +173,19 @@ public class ReplicaEventLog implements 
     public void stop() throws Exception
     {
         // Close the producer and session, DO NOT close connection 
-        producer.close();
-        amqSession.close();
+        if ( journal != null )
+        {
+            journal.close();
+        }
+
+        journal = null;
+
+        if ( recman != null )
+        {
+            recman.close();
+        }
+
+        recman = null;
     }
 
 
@@ -407,11 +403,11 @@ public class ReplicaEventLog implements 
      * @return A cursor on top of the queue
      * @throws Exception If the cursor can't be created
      */
-    public ReplicaEventLogCursor getCursor( String consumerCsn ) throws Exception
+    public Cursor<Tuple<String, ReplicaEventMessage>> getCursor( String consumerCsn ) throws Exception
     {
-        Queue regionQueue = ( Queue ) brokerService.getRegionBroker().getDestinationMap().get( queue );
+        Cursor<Tuple<String, ReplicaEventMessage>> cursor = journal.cursor( consumerCsn );
         
-        return new ReplicaEventLogCursor( amqSession, queue, regionQueue, consumerCsn );
+        return cursor;
     }
 
 

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java?rev=1157271&r1=1157270&r2=1157271&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java Fri Aug 12 22:23:35 2011
@@ -21,29 +21,10 @@
 package org.apache.directory.server.ldap.replication;
 
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Iterator;
-
-import org.apache.directory.server.core.event.EventType;
-import org.apache.directory.shared.i18n.I18n;
-import org.apache.directory.shared.ldap.codec.api.LdapApiService;
-import org.apache.directory.shared.ldap.codec.api.LdapApiServiceFactory;
 import org.apache.directory.shared.ldap.extras.controls.SyncModifyDn;
-import org.apache.directory.shared.ldap.extras.controls.SyncModifyDnType;
-import org.apache.directory.shared.ldap.extras.controls.syncrepl_impl.SyncModifyDnDecorator;
 import org.apache.directory.shared.ldap.model.constants.SchemaConstants;
-import org.apache.directory.shared.ldap.model.entry.Attribute;
-import org.apache.directory.shared.ldap.model.entry.DefaultAttribute;
-import org.apache.directory.shared.ldap.model.entry.DefaultEntry;
 import org.apache.directory.shared.ldap.model.entry.Entry;
-import org.apache.directory.shared.ldap.model.name.Dn;
-import org.apache.directory.shared.ldap.model.schema.AttributeType;
-import org.apache.directory.shared.ldap.model.schema.SchemaManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.directory.shared.ldap.model.message.controls.ChangeType;
 
 
 /**
@@ -51,47 +32,26 @@ import org.slf4j.LoggerFactory;
  *
  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
  */
-public class ReplicaEventMessage implements Externalizable
+public class ReplicaEventMessage
 {
-    /** The logger */
-    private static final Logger LOG = LoggerFactory.getLogger( ReplicaEventMessage.class );
-
-    /** The message event type */
-    private EventType eventType;
+    /** The message change type */
+    private ChangeType changeType;
     
     /** The entry */
     private Entry entry;
 
-    /** The SchemaManager instance */
-    private static SchemaManager schemaManager;
-
-    /** The LDAP codec used to serialize the entries */
-    private LdapApiService codec = LdapApiServiceFactory.getSingleton();
-
     /** The modifyDN control */
     private SyncModifyDn modDnControl;
     
 
     /**
-     *  creates an instance of ReplicaEventMessage.
-     *  
-     *  NOTE: This is required by the event log manager while deserializing.
-     *        <b>DO NOT remove</b>.
-     */
-    public ReplicaEventMessage()
-    {
-        codec = LdapApiServiceFactory.getSingleton();
-    }
-    
-    
-    /**
      * Create a new ReplicaEvent instance for a Add/Delete+Modify operation
-     * @param eventType The event type
+     * @param changeType The change type
      * @param entry The entry
      */
-    public ReplicaEventMessage( EventType eventType, Entry entry )
+    public ReplicaEventMessage( ChangeType changeType, Entry entry )
     {
-        this.eventType = eventType;
+        this.changeType = changeType;
         this.entry = entry;
     }
 
@@ -109,11 +69,11 @@ public class ReplicaEventMessage impleme
 
     
     /**
-     * @return The eventType
+     * @return The changeType
      */
-    public EventType getEventType()
+    public ChangeType getChangeType()
     {
-        return eventType;
+        return changeType;
     }
 
 
@@ -136,157 +96,6 @@ public class ReplicaEventMessage impleme
 
     
     /**
-     * {@inheritDoc}
-     */
-    public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException
-    {
-
-        byte b = in.readByte();
-        
-        if ( b == 0 ) // handle the SyncModDnControl
-        {
-            SyncModifyDnType modDnType = SyncModifyDnType.getModifyDnType( in.readShort() );
-            
-            modDnControl = new SyncModifyDnDecorator( codec );
-            modDnControl.setModDnType( modDnType );
-            
-            modDnControl.setEntryDn( in.readUTF() );
-            
-            switch ( modDnType )
-            {
-                case MOVE:
-                    modDnControl.setNewSuperiorDn( in.readUTF() );
-                    break;
-                   
-                case RENAME:
-                    modDnControl.setNewRdn( in.readUTF() );
-                    modDnControl.setDeleteOldRdn( in.readBoolean() );
-                    break;
-                    
-                case MOVE_AND_RENAME:
-                    modDnControl.setNewSuperiorDn( in.readUTF() );
-                    modDnControl.setNewRdn( in.readUTF() );
-                    modDnControl.setDeleteOldRdn( in.readBoolean() );
-            }
-        }
-        else // read the event type
-        {
-            eventType = EventType.getType( in.readShort() );
-        }
-
-        // initialize the entry
-        entry = new DefaultEntry( schemaManager );
-
-        // Read the Dn
-        Dn dn = null;
-        
-        try
-        {
-            dn = new Dn( schemaManager );
-            dn.readExternal( in );
-        }
-        catch ( ClassNotFoundException cnfe )
-        {
-            IOException ioe = new IOException( cnfe.getMessage() );
-            ioe.initCause( cnfe );
-            throw ioe;
-        }
-        
-        entry.setDn( dn );
-
-        // Read the number of attributes
-        int nbAttributes = in.readInt();
-
-        // Read the attributes
-        for ( int i = 0; i < nbAttributes; i++ )
-        {
-            // Read the attribute's OID
-            String oid = in.readUTF();
-
-            try
-            {
-                AttributeType attributeType = schemaManager.lookupAttributeTypeRegistry( oid );
-
-                // Create the attribute we will read
-                DefaultAttribute attribute = new DefaultAttribute( attributeType );
-
-                // Read the attribute
-                attribute.readExternal( in );
-
-                entry.add( attribute );
-            }
-            catch ( Exception ne )
-            {
-                entry = null;
-                // We weren't able to find the OID. The attribute will not be added
-                LOG.warn( I18n.err( I18n.ERR_04470, oid ) );
-            }
-        }
-    }
-
-
-    /**
-     * {@inheritDoc}
-     */
-    public void writeExternal( ObjectOutput out ) throws IOException
-    {
-        if ( eventType == null )
-        {
-            out.writeByte( 0 );
-            
-            SyncModifyDnType modDnType = modDnControl.getModDnType();
-            out.writeShort( modDnType.getValue() );
-            out.writeUTF( modDnControl.getEntryDn() );
-            
-            switch ( modDnType )
-            {
-                case MOVE:
-                    out.writeUTF( modDnControl.getNewSuperiorDn() );
-                    break;
-                   
-                case RENAME:
-                    out.writeUTF( modDnControl.getNewRdn() );
-                    out.writeBoolean( modDnControl.isDeleteOldRdn() );
-                    break;
-                    
-                case MOVE_AND_RENAME:
-                    out.writeUTF( modDnControl.getNewSuperiorDn() );
-                    out.writeUTF( modDnControl.getNewRdn() );
-                    out.writeBoolean( modDnControl.isDeleteOldRdn() );
-            }
-        }
-        else
-        {
-            out.writeByte( 1 );
-            out.writeShort( eventType.getMask() );
-        }
-
-        // then Dn
-        entry.getDn().writeExternal( out );
-
-        // Then the attributes.
-        out.writeInt( entry.size() );
-
-        // Iterate through the keys. We store the Attribute
-        // here, to be able to restore it in the readExternal :
-        // we need access to the registries, which are not available
-        // in the ServerAttribute class.
-        Iterator<Attribute> attrItr = entry.iterator();
-        
-        while ( attrItr.hasNext() )
-        {
-            DefaultAttribute attribute = ( DefaultAttribute ) attrItr.next();
-            // Write the oid to be able to restore the AttributeType when deserializing
-            // the attribute
-            out.writeUTF( attribute.getAttributeType().getOid() );
-
-            // Write the attribute
-            attribute.writeExternal( out );
-        }
-    }
-
-    
-    /**
      * checks if the event's CSN is older than the given CSN
      *
      * @param csn the CSN
@@ -301,13 +110,4 @@ public class ReplicaEventMessage impleme
         
         return ( i < 0 );
     }
-
-    /**
-     * Set the SchemaManager 
-     * @param schemaManager The SchemaManager instance
-     */
-    public static void setSchemaManager( SchemaManager schemaManager )
-    {
-        ReplicaEventMessage.schemaManager = schemaManager;
-    }
 }

Copied: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessageSerializer.java (from r1156696, directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ModificationSerializer.java)
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessageSerializer.java?p2=directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessageSerializer.java&p1=directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ModificationSerializer.java&r1=1156696&r2=1157271&rev=1157271&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ModificationSerializer.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessageSerializer.java Fri Aug 12 22:23:35 2011
@@ -27,9 +27,16 @@ import java.io.ObjectOutputStream;
 
 import jdbm.helper.Serializer;
 
-import org.apache.directory.server.core.event.EventType;
 import org.apache.directory.server.core.partition.impl.btree.jdbm.EntrySerializer;
+import org.apache.directory.shared.ldap.codec.api.LdapApiService;
+import org.apache.directory.shared.ldap.codec.api.LdapApiServiceFactory;
+import org.apache.directory.shared.ldap.extras.controls.SyncModifyDn;
+import org.apache.directory.shared.ldap.extras.controls.SyncModifyDnType;
+import org.apache.directory.shared.ldap.extras.controls.syncrepl_impl.SyncModifyDnDecorator;
+import org.apache.directory.shared.ldap.model.entry.DefaultEntry;
 import org.apache.directory.shared.ldap.model.entry.Entry;
+import org.apache.directory.shared.ldap.model.message.controls.ChangeType;
+import org.apache.directory.shared.ldap.model.name.Dn;
 import org.apache.directory.shared.ldap.model.schema.SchemaManager;
 
 /**
@@ -43,7 +50,7 @@ import org.apache.directory.shared.ldap.
  * 
  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
  */
-public class ModificationSerializer implements Serializer
+public class ReplicaEventMessageSerializer implements Serializer
 {
     /** The serialVersionUID */
     private static final long serialVersionUID = 1L;
@@ -51,12 +58,15 @@ public class ModificationSerializer impl
     /** The internal entry serializer */
     private transient EntrySerializer entrySerializer;
 
+    /** The LDAP codec used to serialize the entries */
+    private transient LdapApiService codec = LdapApiServiceFactory.getSingleton();
+
     /**
-     * Creates a new instance of ServerEntrySerializer.
+     * Creates a new instance of ReplicaEventMessageSerializer.
      *
      * @param schemaManager The reference to the global schemaManager
      */
-    public ModificationSerializer( SchemaManager schemaManager )
+    public ReplicaEventMessageSerializer( SchemaManager schemaManager )
     {
         entrySerializer = new EntrySerializer( schemaManager );
     }
@@ -67,25 +77,46 @@ public class ModificationSerializer impl
      */
     public byte[] serialize( Object object ) throws IOException
     {
-        Modification modification = (Modification)object;
+        ReplicaEventMessage replicaEventMessage = (ReplicaEventMessage)object;
     
-        Entry entry = modification.getEntry();
-        EventType type = modification.getEventType();
+        Entry entry = replicaEventMessage.getEntry();
+        ChangeType changeType = replicaEventMessage.getChangeType();
+        SyncModifyDn modDnControl = replicaEventMessage.getModDnControl();
         
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         ObjectOutputStream out = new ObjectOutputStream( baos );
 
-        // The event type
-        out.writeByte( type.getMask() );
+        // The change type
+        out.writeByte( changeType.getValue() );
+        
+        // The entry DN
+        entry.getDn().writeExternal( out );
         
         // The entry
-        byte[] data = entrySerializer.serialize( entry );
-
-        // Entry's length
-        out.writeInt( data.length );
+        entry.writeExternal( out );
         
-        // Entry's data
-        out.write( data );
+        // The moddn control if any (only if it's a MODDN operation)
+        if ( changeType == ChangeType.MODDN )
+        {
+            SyncModifyDnType modDnType = modDnControl.getModDnType();
+            out.writeByte( modDnType.getValue() );
+
+            switch ( modDnType )
+            {
+                case MOVE:
+                    out.writeUTF( modDnControl.getNewSuperiorDn() );
+                    break;
+                   
+                case MOVE_AND_RENAME:
+                    out.writeUTF( modDnControl.getNewSuperiorDn() );
+                    // Fall through
+
+                case RENAME:
+                    out.writeUTF( modDnControl.getNewRdn() );
+                    out.writeBoolean( modDnControl.isDeleteOldRdn() );
+                    break;
+            }
+        }
         
         out.flush();
 
@@ -94,30 +125,71 @@ public class ModificationSerializer impl
 
     
     /**
-     *  Deserialize a Modification.
+     *  Deserialize a ReplicaEventMessage.
      *  
-     *  @param bytes the byte array containing the serialized modification
-     *  @return An instance of a Modification object 
-     *  @throws IOException if we can't deserialize the Modification
+     *  @param bytes the byte array containing the serialized ReplicaEventMessage
+     *  @return An instance of a ReplicaEventMessage object 
+     *  @throws IOException if we can't deserialize the ReplicaEventMessage
      */
     public Object deserialize( byte[] bytes ) throws IOException
     {
         ObjectInputStream in = new ObjectInputStream( new ByteArrayInputStream( bytes ) );
 
-        // Read the eventType
+        // Read the changeType
         byte type = in.readByte();
+        ChangeType changeType = ChangeType.getChangeType( type );
+        ReplicaEventMessage replicaEventMessage = null;
+
+        try
+        {
+            // The Entry's DN
+            Dn entryDn = new Dn();
+            entryDn.readExternal( in );
+
+            // Read the Entry
+            Entry entry = new DefaultEntry();
+            entry.readExternal( in );
+            entry.setDn( entryDn );
+            
+            if ( changeType == ChangeType.MODDN )
+            {
+                type = in.readByte();
+                SyncModifyDnType modDnType = SyncModifyDnType.getModifyDnType( type );
+                SyncModifyDn modDnControl = new SyncModifyDnDecorator( codec );
+                
+                modDnControl.setModDnType( modDnType );
+                
+                switch ( modDnType )
+                {
+                    case MOVE :
+                        modDnControl.setNewSuperiorDn( in.readUTF() );
+                        break;
+
+                    case MOVE_AND_RENAME :
+                        modDnControl.setNewSuperiorDn( in.readUTF() );
+                        // Fallthrough
+
+                    case RENAME :
+                        modDnControl.setNewRdn( in.readUTF() );
+                        modDnControl.setDeleteOldRdn( in.readBoolean() );
+                        break;
+                }
+                
+                // And create a ReplicaEventMessage
+                replicaEventMessage = new ReplicaEventMessage( modDnControl, entry );
+
+            }
+            else
+            {
+                // And create a ReplicaEventMessage
+                replicaEventMessage = new ReplicaEventMessage( changeType, entry );
+            }
+        }
+        catch ( ClassNotFoundException cnfe )
+        {
+            // there is nothing we can do here...
+        }
         
-        // The Entry's length
-        int length = in.readInt();
-        byte[] data = new byte[length];
-        
-        // The entry itself
-        in.read( data );
-        Entry entry = (Entry)entrySerializer.deserialize( data );
-        
-        // And create a modification
-        Modification modification = new Modification( EventType.getType( type ), entry );
-        
-        return modification;
+        return replicaEventMessage;
     }
 }

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaJournalCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaJournalCursor.java?rev=1157271&r1=1157270&r2=1157271&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaJournalCursor.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaJournalCursor.java Fri Aug 12 22:23:35 2011
@@ -22,11 +22,11 @@ package org.apache.directory.server.ldap
 
 import java.util.Iterator;
 
-import org.apache.directory.server.core.event.EventType;
 import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable;
 import org.apache.directory.shared.ldap.model.cursor.AbstractCursor;
 import org.apache.directory.shared.ldap.model.cursor.Cursor;
 import org.apache.directory.shared.ldap.model.cursor.Tuple;
+import org.apache.directory.shared.ldap.model.message.controls.ChangeType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
 public class ReplicaJournalCursor extends AbstractCursor<ReplicaEventMessage>
 {
     /** Logger for this class */
-    private static final Logger LOG = LoggerFactory.getLogger( ReplicaEventLogCursor.class );
+    private static final Logger LOG = LoggerFactory.getLogger( ReplicaJournalCursor.class );
     
     /** the underlying journal's cursor */
     private Cursor<Tuple<String, ReplicaEventMessage>> tupleCursor;
@@ -146,10 +146,11 @@ public class ReplicaJournalCursor extend
             {
                 String evt = "MODDN"; // take this as default cause the event type for MODDN is null
                 
-                EventType evtType = qualifiedEvtMsg.getEventType();
-                if ( evtType != null )
+                ChangeType changeType = qualifiedEvtMsg.getChangeType();
+                
+                if ( changeType != null )
                 {
-                    evt = evtType.name();
+                    evt = changeType.name();
                 }
                 
                 LOG.debug( "event {} for dn {} is not qualified for sending", evt, qualifiedEvtMsg.getEntry().getDn() );

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplRequestHandler.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplRequestHandler.java?rev=1157271&r1=1157270&r2=1157271&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplRequestHandler.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplRequestHandler.java Fri Aug 12 22:23:35 2011
@@ -27,16 +27,12 @@ import static org.apache.directory.serve
 
 import java.io.File;
 import java.net.InetSocketAddress;
-import java.net.URI;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
 import org.apache.directory.server.core.DirectoryService;
 import org.apache.directory.server.core.event.EventType;
 import org.apache.directory.server.core.event.NotificationCriteria;
@@ -58,6 +54,8 @@ import org.apache.directory.shared.ldap.
 import org.apache.directory.shared.ldap.extras.controls.syncrepl_impl.SyncStateValueDecorator;
 import org.apache.directory.shared.ldap.model.constants.SchemaConstants;
 import org.apache.directory.shared.ldap.model.csn.Csn;
+import org.apache.directory.shared.ldap.model.cursor.Cursor;
+import org.apache.directory.shared.ldap.model.cursor.Tuple;
 import org.apache.directory.shared.ldap.model.entry.Attribute;
 import org.apache.directory.shared.ldap.model.entry.Entry;
 import org.apache.directory.shared.ldap.model.entry.StringValue;
@@ -84,6 +82,7 @@ import org.apache.directory.shared.ldap.
 import org.apache.directory.shared.ldap.model.message.SearchResultReference;
 import org.apache.directory.shared.ldap.model.message.SearchResultReferenceImpl;
 import org.apache.directory.shared.ldap.model.message.SearchScope;
+import org.apache.directory.shared.ldap.model.message.controls.ChangeType;
 import org.apache.directory.shared.ldap.model.message.controls.ManageDsaIT;
 import org.apache.directory.shared.ldap.model.schema.AttributeType;
 import org.apache.directory.shared.ldap.model.url.LdapUrl;
@@ -119,10 +118,6 @@ public class SyncReplRequestHandler impl
 
     private Map<Integer, ReplicaEventLog> replicaLogMap = new HashMap<Integer, ReplicaEventLog>();
 
-    private BrokerService brokerService;
-
-    private ActiveMQConnection amqConnection;
-
     private File syncReplData;
 
     private AtomicInteger replicaCount = new AtomicInteger( 0 );
@@ -167,28 +162,6 @@ public class SyncReplRequestHandler impl
                 syncReplData.mkdirs();
             }
 
-            String path = syncReplData.getPath();
-
-            // Create the system responsible for transmitting the requests when
-            // the direct connection to the consumer is closed.
-            brokerService = new BrokerService();
-            brokerService.setUseJmx( false );
-            brokerService.setPersistent( true );
-            brokerService.setDataDirectory( path );
-
-            URI vmConnectorUri = new URI( "vm://localhost" );
-            brokerService.setVmConnectorURI( vmConnectorUri );
-
-            brokerService.start();
-            ActiveMQConnectionFactory amqFactory = new ActiveMQConnectionFactory( vmConnectorUri.toString() );
-            amqFactory.setObjectMessageSerializationDefered( false );
-
-            amqConnection = ( ActiveMQConnection ) amqFactory.createConnection();
-            amqConnection.start();
-
-            // set the static reference to SchemaManager
-            ReplicaEventMessage.setSchemaManager( dirService.getSchemaManager() );
-
             replicaUtil = new ReplConsumerManager( dirService );
 
             loadReplicaInfo();
@@ -225,24 +198,6 @@ public class SyncReplRequestHandler impl
             }
         }
         
-        try
-        {
-            amqConnection.close();
-        }
-        catch ( Exception e )
-        {
-            LOG.warn( "Failed to close the message queue connection", e );
-        }
-
-        try
-        {
-            brokerService.stop();
-        }
-        catch ( Exception e )
-        {
-            LOG.warn( "Failed to close the message broker service", e );
-        }
-
         initialized = false;
     }
 
@@ -318,13 +273,14 @@ public class SyncReplRequestHandler impl
         // do the search from the log
         String lastSentCsn = clientMsgLog.getLastSentCsn();
 
-        ReplicaEventLogCursor cursor = clientMsgLog.getCursor( consumerCsn );
+        Cursor<Tuple<String, ReplicaEventMessage>> cursor = clientMsgLog.getCursor( consumerCsn );
         //int i = 0;
         
         while ( cursor.next() )
         {
-            ReplicaEventMessage message = cursor.get();
-            Entry entry = message.getEntry();
+            Tuple<String, ReplicaEventMessage> tuple = cursor.get();
+            ReplicaEventMessage replicaEventMessage = tuple.getValue();
+            Entry entry = replicaEventMessage.getEntry();
             LOG.debug( "received message from the queue {}", entry );
             //i++;
             
@@ -332,12 +288,12 @@ public class SyncReplRequestHandler impl
 
             lastSentCsn = entry.get( SchemaConstants.ENTRY_CSN_AT ).getString();
 
-            EventType event = message.getEventType();
+            ChangeType event = replicaEventMessage.getChangeType();
 
             // if event type is null, then it is a MODDN operation
-            if ( event == null )
+            if ( event == ChangeType.MODDN )
             {
-                sendSearchResultEntry( session, req, entry, message.getModDnControl() );
+                sendSearchResultEntry( session, req, entry, replicaEventMessage.getModDnControl() );
             }
             else
             {
@@ -917,7 +873,6 @@ public class SyncReplRequestHandler impl
                 for ( ReplicaEventLog replica : eventLogs )
                 {
                     LOG.debug( "initializing the replica log from {}", replica.getId() );
-                    replica.configure( amqConnection, brokerService );
                     replicaLogMap.put( replica.getId(), replica );
 
                     // update the replicaCount's value to assign a correct value to the new replica(s) 
@@ -1068,12 +1023,10 @@ public class SyncReplRequestHandler impl
 
         LOG.debug( "creating a new event log for the replica with id {}", replicaId );
 
-        ReplicaEventLog replicaLog = new ReplicaEventLog( replicaId );
+        ReplicaEventLog replicaLog = new ReplicaEventLog( dirService, replicaId );
         replicaLog.setHostName( hostName );
         replicaLog.setSearchFilter( filter );
 
-        replicaLog.configure( amqConnection, brokerService );
-
         return replicaLog;
     }
 

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java?rev=1157271&r1=1157270&r2=1157271&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java Fri Aug 12 22:23:35 2011
@@ -21,6 +21,7 @@ package org.apache.directory.server.ldap
 
 
 import org.apache.directory.server.core.DirectoryService;
+import org.apache.directory.server.core.entry.ClonedServerEntry;
 import org.apache.directory.server.core.event.DirectoryListener;
 import org.apache.directory.server.core.event.EventType;
 import org.apache.directory.server.core.interceptor.context.AddOperationContext;
@@ -45,6 +46,7 @@ import org.apache.directory.shared.ldap.
 import org.apache.directory.shared.ldap.model.message.SearchRequest;
 import org.apache.directory.shared.ldap.model.message.SearchResultEntry;
 import org.apache.directory.shared.ldap.model.message.SearchResultEntryImpl;
+import org.apache.directory.shared.ldap.model.message.controls.ChangeType;
 import org.apache.directory.shared.util.Strings;
 import org.apache.mina.core.future.WriteFuture;
 import org.slf4j.Logger;
@@ -207,7 +209,7 @@ public class SyncReplSearchListener impl
         {
             //System.out.println( "ADD Listener : log " + entry.getDn() );
             // we log it first
-            consumerMsgLog.log( new ReplicaEventMessage( EventType.ADD, entry ) );
+            consumerMsgLog.log( new ReplicaEventMessage( ChangeType.ADD, ((ClonedServerEntry)entry).getClonedEntry() ) );
 
             // We send the added entry directly to the consumer if it's connected
             if ( pushInRealTime )
@@ -253,7 +255,7 @@ public class SyncReplSearchListener impl
         try
         {
             //System.out.println( "DELETE Listener : log " + entry.getDn() );
-            consumerMsgLog.log( new ReplicaEventMessage( EventType.DELETE, entry ) );
+            consumerMsgLog.log( new ReplicaEventMessage( ChangeType.DELETE, entry ) );
             
             if ( pushInRealTime )
             {
@@ -287,7 +289,7 @@ public class SyncReplSearchListener impl
         try
         {
             //System.out.println( "MODIFY Listener : log " + alteredEntry.getDn() );
-            consumerMsgLog.log( new ReplicaEventMessage( EventType.MODIFY, alteredEntry ) );
+            consumerMsgLog.log( new ReplicaEventMessage( ChangeType.MODIFY, alteredEntry ) );
             
             if ( pushInRealTime )
             {

Modified: directory/apacheds/trunk/protocol-ldap/src/test/java/org/apache/directory/server/ldap/JournalTest.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/test/java/org/apache/directory/server/ldap/JournalTest.java?rev=1157271&r1=1157270&r2=1157271&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/test/java/org/apache/directory/server/ldap/JournalTest.java (original)
+++ directory/apacheds/trunk/protocol-ldap/src/test/java/org/apache/directory/server/ldap/JournalTest.java Fri Aug 12 22:23:35 2011
@@ -27,11 +27,10 @@ import java.io.File;
 import jdbm.RecordManager;
 import jdbm.recman.BaseRecordManager;
 
-import org.apache.directory.server.core.event.EventType;
 import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable;
 import org.apache.directory.server.core.partition.impl.btree.jdbm.StringSerializer;
-import org.apache.directory.server.ldap.replication.Modification;
-import org.apache.directory.server.ldap.replication.ModificationSerializer;
+import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
+import org.apache.directory.server.ldap.replication.ReplicaEventMessageSerializer;
 import org.apache.directory.shared.ldap.model.constants.SchemaConstants;
 import org.apache.directory.shared.ldap.model.csn.Csn;
 import org.apache.directory.shared.ldap.model.csn.CsnFactory;
@@ -39,6 +38,7 @@ import org.apache.directory.shared.ldap.
 import org.apache.directory.shared.ldap.model.cursor.Tuple;
 import org.apache.directory.shared.ldap.model.entry.DefaultEntry;
 import org.apache.directory.shared.ldap.model.entry.Entry;
+import org.apache.directory.shared.ldap.model.message.controls.ChangeType;
 import org.apache.directory.shared.ldap.model.schema.SchemaManager;
 import org.apache.directory.shared.ldap.model.schema.comparators.SerializableComparator;
 import org.apache.directory.shared.ldap.schemaextractor.SchemaLdifExtractor;
@@ -53,7 +53,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 /**
- * A test to check that we can correctly create a Journal to store the modifications.
+ * A test to check that we can correctly create a Journal to store the ReplicaEventMessages.
  * 
  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
  *
@@ -72,7 +72,7 @@ public class JournalTest
     private static SchemaManager schemaManager;
 
     /** The Journal */
-    private JdbmTable<String, Modification> journal;
+    private JdbmTable<String, ReplicaEventMessage> journal;
     
     /** The CsnFactory */
     private static CsnFactory csnFactory;
@@ -126,12 +126,14 @@ public class JournalTest
 
         dbFile = File.createTempFile( getClass().getSimpleName(), "db", tmpDir );
         recman = new BaseRecordManager( dbFile.getAbsolutePath() );
+        ((BaseRecordManager)recman).disableTransactions();
 
         SerializableComparator<String> comparator = new SerializableComparator<String>( SchemaConstants.CSN_ORDERING_MATCH_MR_OID );
         comparator.setSchemaManager( schemaManager );
         
-        journal = new JdbmTable<String, Modification>( schemaManager, "test", recman, comparator, 
-            new StringSerializer(), new ModificationSerializer( schemaManager ) );
+        journal = new JdbmTable<String, ReplicaEventMessage>( schemaManager, "test", recman, comparator, 
+            new StringSerializer(), new ReplicaEventMessageSerializer( schemaManager ) );
+        
     }
 
     /**
@@ -168,7 +170,7 @@ public class JournalTest
     
 
     /**
-     * test that we can write 1000 modifications, and read them back in the right order 
+     * test that we can write 1000 ReplicaEventMessages, and read them back in the right order 
      * starting in the middle.
      */
     @Test
@@ -192,30 +194,30 @@ public class JournalTest
                 "entryCsn", entryCsn.toString()
                 );
             
-            Modification modification = new Modification( EventType.ADD, entry );
-            journal.put( entryCsn.toString(), modification );
+            ReplicaEventMessage replicaEventMessage = new ReplicaEventMessage( ChangeType.ADD, entry );
+            journal.put( entryCsn.toString(), replicaEventMessage );
             journal.sync();
 
             entryCsn = csnFactory.newInstance();
         }
         
-        // Now check that the modification has been written
-        Modification firstModification = journal.get( firstCsn.toString() );
+        // Now check that the ReplicaEventMessages has been written
+        ReplicaEventMessage firstMessage = journal.get( firstCsn.toString() );
         
-        assertEquals( EventType.ADD, firstModification.getEventType());
-        assertEquals( "test0", firstModification.getEntry().get( "ou" ).getString() );
+        assertEquals( ChangeType.ADD, firstMessage.getChangeType());
+        assertEquals( "test0", firstMessage.getEntry().get( "ou" ).getString() );
         
         // Read entry from the 100th element
-        Cursor<Tuple<String, Modification>> cursor = journal.cursor( csn100.toString() );
+        Cursor<Tuple<String, ReplicaEventMessage>> cursor = journal.cursor( csn100.toString() );
         int pos = 100;
         
         while ( cursor.next() )
         {
-            Tuple<String, Modification> tuple = cursor.get();
-            Modification modification = tuple.getValue();
+            Tuple<String, ReplicaEventMessage> tuple = cursor.get();
+            ReplicaEventMessage replicaEventMessage = tuple.getValue();
             
-            assertEquals( EventType.ADD, modification.getEventType());
-            assertEquals( "test" + pos, modification.getEntry().get( "ou" ).getString() );
+            assertEquals( ChangeType.ADD, replicaEventMessage.getChangeType());
+            assertEquals( "test" + pos, replicaEventMessage.getEntry().get( "ou" ).getString() );
             
             pos++;
         }
@@ -223,8 +225,8 @@ public class JournalTest
 
 
     /**
-     * test that we can write 1000 modifications, remove 500 of them, and read the 
-     * remining ones.
+     * test that we can write 1000 ReplicaEventMessages, remove 500 of them, and read the 
+     * remaining ones.
      */
     @Test
     public void testJournalTruncate() throws Exception
@@ -240,40 +242,41 @@ public class JournalTest
                 "entryCsn", entryCsn.toString()
                 );
             
-            Modification modification = new Modification( EventType.ADD, entry );
-            journal.put( entryCsn.toString(), modification );
+            ReplicaEventMessage replicaEventMessage = new ReplicaEventMessage( ChangeType.ADD, entry );
+            journal.put( entryCsn.toString(), replicaEventMessage );
             journal.sync();
 
             entryCsn = csnFactory.newInstance();
         }
         
-        // Remove the first 500 modifications
-        Cursor<Tuple<String, Modification>> deleteCursor = journal.cursor();
+        // Remove the first 500 ReplicaEventMessages
+        Cursor<Tuple<String, ReplicaEventMessage>> deleteCursor = journal.cursor();
         int deleted = 0;
 
         while ( deleteCursor.next() && ( deleted < 500 ) )
         {
-            Tuple<String, Modification> tuple = deleteCursor.get();
-            Modification modification = tuple.getValue();
+            Tuple<String, ReplicaEventMessage> tuple = deleteCursor.get();
+            ReplicaEventMessage replicaEventMessage = tuple.getValue();
             
-            assertEquals( EventType.ADD, modification.getEventType());
-            assertEquals( "test" + deleted, modification.getEntry().get( "ou" ).getString() );
+            assertEquals( ChangeType.ADD, replicaEventMessage.getChangeType() );
+            assertEquals( "test" + deleted, replicaEventMessage.getEntry().get( "ou" ).getString() );
             
-            journal.remove( modification.getEntry().get( "entryCsn" ).getString() );
+            journal.remove( replicaEventMessage.getEntry().get( "entryCsn" ).getString() );
+            journal.sync();
             deleted++;
         }
         
         // Now check that the first mod is the 501th
         assertEquals( 500, journal.count() );
         
-        Cursor<Tuple<String, Modification>> cursor = journal.cursor();
+        Cursor<Tuple<String, ReplicaEventMessage>> cursor = journal.cursor();
         
         cursor.next();
 
-        Tuple<String, Modification> tuple = cursor.get();
-        Modification modification = tuple.getValue();
-        assertEquals( EventType.ADD, modification.getEventType() );
-        assertEquals( "test500", modification.getEntry().get( "ou" ).getString() );
+        Tuple<String, ReplicaEventMessage> tuple = cursor.get();
+        ReplicaEventMessage replicaEventMessage = tuple.getValue();
+        assertEquals( ChangeType.ADD, replicaEventMessage.getChangeType() );
+        assertEquals( "test500", replicaEventMessage.getEntry().get( "ou" ).getString() );
     }
 
 
@@ -281,9 +284,9 @@ public class JournalTest
      * Test the performances for 100 000 writes, read and delete.
      * On my laptop, it takes : <br>
      * <ul>
-     * <li>63,4 seconds to create 100 000 modifications ( 1577/s )</li>
-     * <li>18,9 seconds to read 100 000 modifications ( 5298/s )</li>
-     * <li>329 seconds to delete 100 000 modifications ( 303/s )</li>
+     * <li>457 seconds to create 100 000 ReplicaEventMessages ( 219/s )</li>
+     * <li>17 seconds to read 100 000 ReplicaEventMessages ( 5893/s )</li>
+     * <li>546 seconds to delete 100 000 ReplicaEventMessages ( 183/s )</li>
      * </ul>
      * 
      */
@@ -305,59 +308,63 @@ public class JournalTest
                 "entryCsn", entryCsn.toString()
                 );
             
-            Modification modification = new Modification( EventType.ADD, entry );
-            journal.put( entryCsn.toString(), modification );
+            ReplicaEventMessage replicaEventMessage = new ReplicaEventMessage( ChangeType.ADD, entry );
+            journal.put( entryCsn.toString(), replicaEventMessage );
             journal.sync();
+            recman.commit();
 
             entryCsn = csnFactory.newInstance();
         }
         
         long t1 = System.currentTimeMillis();
         
-        System.out.println( "Time to write 100 000 modifications : " + ( t1 - t0 ) );
+        System.out.println( "Time to write 100 000 ReplicaEventMessages : " + ( t1 - t0 ) );
         
         // The read perf
         long t2 = System.currentTimeMillis();
         
-        Cursor<Tuple<String, Modification>> readCursor = journal.cursor();
+        Cursor<Tuple<String, ReplicaEventMessage>> readCursor = journal.cursor();
 
         int pos = 0;
         
         while ( readCursor.next() )
         {
-            Tuple<String, Modification> tuple = readCursor.get();
-            Modification modification = tuple.getValue();
+            Tuple<String, ReplicaEventMessage> tuple = readCursor.get();
+            ReplicaEventMessage replicaEventMessage = tuple.getValue();
             
-            assertEquals( EventType.ADD, modification.getEventType());
-            assertEquals( "test" + pos, modification.getEntry().get( "ou" ).getString() );
+            assertEquals( ChangeType.ADD, replicaEventMessage.getChangeType());
+            assertEquals( "test" + pos, replicaEventMessage.getEntry().get( "ou" ).getString() );
             
             pos++;
         }
         
         long t3 = System.currentTimeMillis();
         
-        System.out.println( "Time to read 100 000 modifications : " + ( t3 - t2 ) );
+        System.out.println( "Time to read 100 000 ReplicaEventMessages : " + ( t3 - t2 ) );
 
         // The delete perf
         long t4 = System.currentTimeMillis();
         
-        Cursor<Tuple<String, Modification>> deleteCursor = journal.cursor();
+        Cursor<Tuple<String, ReplicaEventMessage>> deleteCursor = journal.cursor();
         int deleted = 0;
 
         while ( deleteCursor.next() )
         {
-            Tuple<String, Modification> tuple = deleteCursor.get();
-            Modification modification = tuple.getValue();
+            Tuple<String, ReplicaEventMessage> tuple = deleteCursor.get();
+            ReplicaEventMessage replicaEventMessage = tuple.getValue();
             
-            assertEquals( EventType.ADD, modification.getEventType());
-            assertEquals( "test" + deleted, modification.getEntry().get( "ou" ).getString() );
+            assertEquals( ChangeType.ADD, replicaEventMessage.getChangeType());
+            assertEquals( "test" + deleted, replicaEventMessage.getEntry().get( "ou" ).getString() );
             
-            journal.remove( modification.getEntry().get( "entryCsn" ).getString() );
+            journal.remove( replicaEventMessage.getEntry().get( "entryCsn" ).getString() );
+            journal.sync();
+            recman.commit();
+
             deleted++;
         }
 
         long t5 = System.currentTimeMillis();
 
-        System.out.println( "Time to delete 100 000 modifications : " + ( t5 - t4 ) );
+        System.out.println( "Time to delete 100 000 ReplicaEventMessages : " + ( t5 - t4 ) );
     }
 }

Modified: directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientServerReplicationIT.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientServerReplicationIT.java?rev=1157271&r1=1157270&r2=1157271&view=diff
==============================================================================
--- directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientServerReplicationIT.java (original)
+++ directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientServerReplicationIT.java Fri Aug 12 22:23:35 2011
@@ -84,7 +84,6 @@ public class ClientServerReplicationIT
     public static void setUp() throws Exception
     {
         Class<?> justLoadToSetControlProperties = Class.forName( FrameworkRunner.class.getName() );
-        
         startProvider();
         startConsumer();
     }
@@ -145,6 +144,16 @@ public class ClientServerReplicationIT
      */
     private boolean checkEntryExistence( CoreSession session, Dn entryDn ) throws Exception
     {
+        return checkEntryExistence( session, entryDn, false );
+    }
+    
+    
+    /**
+     * Check that the entry exists in the target server. We wait up to 10 seconds, by
+     * 100ms steps, until either the entry s found, or we have exhausted the 10 seconds delay.
+     */
+    private boolean checkEntryExistence( CoreSession session, Dn entryDn, boolean print ) throws Exception
+    {
         boolean replicated = false;
         
         for ( int i = 0; i < 100; i++ )
@@ -153,6 +162,11 @@ public class ClientServerReplicationIT
             
             if ( session.exists( entryDn ) )
             {
+                if ( print )
+                {      
+                    System.out.println( entryDn.getName() + " exists " );
+                }
+                
                 replicated = true;
                 break;
             }
@@ -372,6 +386,7 @@ public class ClientServerReplicationIT
     
     
     @Test
+    @Ignore
     public void testRebootConsumer() throws Exception
     {
         System.out.println( "----> 1 testRebootConsumer started --------------------------------" );
@@ -452,7 +467,11 @@ public class ClientServerReplicationIT
     }
     
     
-    @CreateDS(allowAnonAccess = true, name = "provider-replication", partitions =
+    @CreateDS(
+        allowAnonAccess = true, 
+        name = "provider-replication", 
+        enableChangeLog = false,
+        partitions =
         {
             @CreatePartition(
                 name = "example",
@@ -502,7 +521,11 @@ public class ClientServerReplicationIT
     }
     
     
-    @CreateDS(allowAnonAccess = true, name = "consumer-replication", partitions =
+    @CreateDS(
+        allowAnonAccess = true, 
+        enableChangeLog = false,
+        name = "consumer-replication", 
+        partitions =
         {
             @CreatePartition(
                 name = "example",



Mime
View raw message