directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From elecha...@apache.org
Subject svn commit: r749816 [1/2] - in /directory/apacheds/branches/apacheds-replication: ./ core/src/main/java/org/apache/directory/server/core/ core/src/main/java/org/apache/directory/server/core/journal/ core/src/main/java/org/apache/directory/server/core/p...
Date Tue, 03 Mar 2009 22:52:43 GMT
Author: elecharny
Date: Tue Mar  3 22:52:42 2009
New Revision: 749816

URL: http://svn.apache.org/viewvc?rev=749816&view=rev
Log:
Injected the journal interceptor

Added:
    directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/
    directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/DefaultJournal.java
    directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/DefaultJournalStore.java
    directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/Journal.java
    directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalEvent.java
    directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalInterceptor.java
    directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalStore.java
Modified:
    directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java
    directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/DirectoryService.java
    directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/partition/ByPassConstants.java
    directory/apacheds/branches/apacheds-replication/core/src/test/java/org/apache/directory/server/core/authz/support/MaxImmSubFilterTest.java
    directory/apacheds/branches/apacheds-replication/core/src/test/java/org/apache/directory/server/core/interceptor/InterceptorChainTest.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/common/CSN.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/common/CSNFactory.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationFactory.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationInterceptor.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BeginLogEntriesAckMessageDecoder.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BeginLogEntriesAckMessageEncoder.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerContextHandler.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/store/derby/DerbyReplicationLogIterator.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/store/derby/DerbyReplicationStore.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/test/java/org/apache/directory/mitosis/common/CSNTest.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/test/java/org/apache/directory/mitosis/service/ReplicationServiceITest.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/test/java/org/apache/directory/mitosis/service/protocol/codec/AbstractMessageCodecTest.java
    directory/apacheds/branches/apacheds-replication/pom.xml

Modified: directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java (original)
+++ directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java Tue Mar  3 22:52:42 2009
@@ -46,6 +46,9 @@
 import org.apache.directory.server.core.interceptor.context.EntryOperationContext;
 import org.apache.directory.server.core.interceptor.context.LookupOperationContext;
 import org.apache.directory.server.core.interceptor.context.RemoveContextPartitionOperationContext;
+import org.apache.directory.server.core.journal.DefaultJournal;
+import org.apache.directory.server.core.journal.Journal;
+import org.apache.directory.server.core.journal.JournalInterceptor;
 import org.apache.directory.server.core.normalization.NormalizationInterceptor;
 import org.apache.directory.server.core.operational.OperationalAttributeInterceptor;
 import org.apache.directory.server.core.partition.DefaultPartitionNexus;
@@ -149,6 +152,9 @@
     /** the change log service */
     private ChangeLog changeLog;
     
+    /** the journal service */
+    private Journal journal;
+    
     /** 
      * the interface used to perform various operations on this 
      * DirectoryService
@@ -200,6 +206,7 @@
     {
         setDefaultInterceptorConfigurations();
         changeLog = new DefaultChangeLog();
+        journal = new DefaultJournal();
         
         // --------------------------------------------------------------------
         // Load the bootstrap schemas to start up the schema partition
@@ -514,18 +521,42 @@
     }
 
 
+    /**
+     * {@inheritDoc}
+     */
     public ChangeLog getChangeLog()
     {
         return changeLog;
     }
 
 
+    /**
+     * {@inheritDoc}
+     */
+    public Journal getJournal()
+    {
+        return journal;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
     public void setChangeLog( ChangeLog changeLog )
     {
         this.changeLog = changeLog;
     }
 
 
+    /**
+     * {@inheritDoc}
+     */
+    public void setJournal( Journal journal )
+    {
+        this.journal = journal;
+    }
+
+
     public void addPartition( Partition parition ) throws Exception
     {
         partitions.add( parition );
@@ -579,6 +610,7 @@
         list.add( new CollectiveAttributeInterceptor() );
         list.add( new EventInterceptor() );
         list.add( new TriggerInterceptor() );
+        list.add( new JournalInterceptor() );
 
         setInterceptors( list );
     }
@@ -878,13 +910,20 @@
             return;
         }
 
-        this.changeLog.sync();
-        this.changeLog.destroy();
-
-        this.partitionNexus.sync();
-        this.partitionNexus.destroy();
-        this.interceptorChain.destroy();
-        this.started = false;
+        // Shutdown the changelog
+        changeLog.sync();
+        changeLog.destroy();
+        
+        // Shutdown the journal
+        journal.destroy();
+
+        // Shutdown the partition
+        partitionNexus.sync();
+        partitionNexus.destroy();
+        
+        // And shutdown the server
+        interceptorChain.destroy();
+        started = false;
         setDefaultInterceptorConfigurations();
     }
 
@@ -1447,6 +1486,12 @@
                 partitionNexus.getRootDSE( null ).getOriginalEntry().add( SchemaConstants.CHANGELOG_CONTEXT_AT, clSuffix );
             }
         }
+        
+        // Initialize the journal if it's enabled
+        //if ( journal.isEnabled() )
+        {
+            journal.init( this );
+        }
 
         if ( LOG.isDebugEnabled() )
         {

Modified: directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/DirectoryService.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/DirectoryService.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/DirectoryService.java (original)
+++ directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/DirectoryService.java Tue Mar  3 22:52:42 2009
@@ -27,6 +27,7 @@
 import org.apache.directory.server.core.event.EventService;
 import org.apache.directory.server.core.interceptor.Interceptor;
 import org.apache.directory.server.core.interceptor.InterceptorChain;
+import org.apache.directory.server.core.journal.Journal;
 import org.apache.directory.server.core.partition.Partition;
 import org.apache.directory.server.core.partition.PartitionNexus;
 import org.apache.directory.server.core.schema.SchemaService;
@@ -378,7 +379,7 @@
     /**
      * Gets the ChangeLog service for this DirectoryService used for tracking
      * changes (revisions) to the server and using them to revert the server
-     * to earier revisions.
+     * to earlier revisions.
      *
      * @return the change log service
      */
@@ -386,6 +387,15 @@
 
 
     /**
+     * Gets the Journal service for this DirectoryService used for tracking
+     * changes to the server.
+     *
+     * @return the journal service
+     */
+    Journal getJournal();
+
+
+    /**
      * Sets the ChangeLog service for this DirectoryService used for tracking
      * changes (revisions) to the server and using them to revert the server
      * to earier revisions.

Added: directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/DefaultJournal.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/DefaultJournal.java?rev=749816&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/DefaultJournal.java (added)
+++ directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/DefaultJournal.java Tue Mar  3 22:52:42 2009
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.directory.server.core.journal;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.directory.server.core.DirectoryService;
+import org.apache.directory.server.core.authn.LdapPrincipal;
+import org.apache.directory.shared.ldap.ldif.LdifEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The default journal implementation. It stores the operation and the
+ * associated status (acked or nacked) in a file which will be used to
+ * restore the server if it crashes.
+ * 
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class DefaultJournal implements Journal
+{
+    /** The class logger */
+    private static final Logger LOG = LoggerFactory.getLogger( DefaultJournal.class );
+
+    /** Tells if the service is activated or not */ 
+    private boolean enabled;
+
+    /** An instance of the Journal store */
+    private JournalStore store;
+
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void destroy() throws Exception
+    {
+        LOG.debug( "Stopping the journal" );
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public JournalStore getJournalStore()
+    {
+        return store;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void init( DirectoryService directoryService ) throws Exception
+    {
+        LOG.debug( "Starting the journal" );
+
+        store = new DefaultJournalStore();
+        store.init( directoryService );
+
+        LOG.debug( "The Journal service has been initialized" );
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean isEnabled()
+    {
+        return enabled;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void log( LdapPrincipal principal, long revision, LdifEntry entry ) throws Exception
+    {
+        store.log( principal, revision, entry );
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void ack( long revision )
+    {
+        store.ack( revision );
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void nack( long revision )
+    {
+        store.nack( revision );
+    }
+
+
+    public void setEnabled( boolean enabled )
+    {
+        // TODO Auto-generated method stub
+    }
+
+
+    public void setJournalStore( JournalStore store )
+    {
+        // TODO Auto-generated method stub
+    }
+}

Added: directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/DefaultJournalStore.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/DefaultJournalStore.java?rev=749816&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/DefaultJournalStore.java (added)
+++ directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/DefaultJournalStore.java Tue Mar  3 22:52:42 2009
@@ -0,0 +1,227 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.journal;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.Writer;
+
+import javax.naming.NamingException;
+
+import org.apache.directory.server.core.DirectoryService;
+import org.apache.directory.server.core.authn.LdapPrincipal;
+import org.apache.directory.shared.ldap.ldif.LdifEntry;
+import org.apache.directory.shared.ldap.ldif.LdifUtils;
+
+/**
+ * 
+ * @author elecharny
+ *
+  * @org.apache.xbean.XBean
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+*/
+public class DefaultJournalStore implements JournalStore
+{
+    /** The directory where the journal is stored */
+    private File workingDirectory;
+    
+    /** The journal file name */
+    private String fileName;
+    
+    /** The file containing the journal */
+    private File journal;
+    
+    /** The stream used to write data into the journal */
+    private Writer writer;
+
+    
+    public void destroy() throws Exception
+    {
+        if ( writer != null )
+        {
+            writer.close();
+        }
+        
+    }
+    
+    
+    /**
+     * Initialize the interceptor
+     */
+    public void init( DirectoryService service ) throws Exception
+    {
+        workingDirectory = service.getWorkingDirectory();
+
+        /** Load or create the journal file */
+        if ( fileName == null )
+        {
+            fileName = "journal.ldif";
+        }
+        
+        journal = new File( workingDirectory, fileName );
+        
+        // The new requests are added at the end of the existing journal
+        writer = new PrintWriter( 
+            new OutputStreamWriter(
+                new FileOutputStream( journal, true ) ) );
+    }
+    
+    
+    /**
+     * Stores an event into the journal.
+     * 
+     * @param principal The principal who is logging the change
+     * @param revision The operation revision
+     * @param forward The change to log
+     */
+    public boolean log( LdapPrincipal principal, long revision, LdifEntry forward )
+    {
+        synchronized ( writer )
+        {
+            try
+            {
+                // Write the LdapPrincipal
+                writer.write( "# principal: " );
+                writer.write( principal.getJndiName().toString() );
+                writer.write( '\n' );
+                
+                // Write the timestamp
+                writer.write( "# timestamp: " );
+                writer.write( Long.toString( System.currentTimeMillis() ) );
+                writer.write( '\n' );
+                
+                // Write the revision
+                writer.write( "# revision: " );
+                writer.write( Long.toString( revision ) );
+                writer.write( "\n" );
+                
+                // Write the entry
+                writer.write( LdifUtils.convertToLdif( forward, 80 ) );
+                writer.flush();
+            }
+            catch ( NamingException ne )
+            {
+                return false;
+            }
+            catch ( IOException ioe )
+            {
+                return false;
+            }
+        }
+        
+        return true;
+    }
+    
+    
+    /**
+     * Records a ack for a change
+     *
+     * @param revision The change revision which is acked
+     * @return <code>true</code> if the ack has been written
+     * @throws Exception if there are problems logging the ack
+     */
+    public boolean ack( long revision )
+    {
+        synchronized ( writer )
+        {
+            try
+            {
+                // Write the revision
+                writer.write( "# ack-revision: " );
+                writer.write( Long.toString( revision ) );
+                writer.write( "\n\n" );
+
+                writer.flush();
+            }
+            catch ( IOException ioe )
+            {
+                return false;
+            }
+        }
+        
+        return true;
+    }
+    
+    
+    /**
+     * Records a nack for a change
+     *
+     * @param revision The change revision which is nacked
+     * @return <code>true</code> if the nack has been written
+     * @throws Exception if there are problems logging the nack
+     */
+    public boolean nack( long revision )
+    {
+        synchronized ( writer )
+        {
+            try
+            {
+                // Write the revision
+                writer.write( "# nack-revision: " );
+                writer.write( Long.toString( revision ) );
+                writer.write( "\n\n" );
+
+                writer.flush();
+            }
+            catch ( IOException ioe )
+            {
+                return false;
+            }
+        }
+        
+        return true;
+    }
+
+    
+    public void sync() throws Exception
+    {
+        // TODO Auto-generated method stub
+        
+    }
+    
+    public long getCurrentRevision()
+    {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+
+    /**
+     * @return the fileName
+     */
+    public String getFileName()
+    {
+        return fileName;
+    }
+
+
+    /**
+     * @param fileName the fileName to set
+     */
+    public void setFileName( String fileName )
+    {
+        this.fileName = fileName;
+    }
+}

Added: directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/Journal.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/Journal.java?rev=749816&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/Journal.java (added)
+++ directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/Journal.java Tue Mar  3 22:52:42 2009
@@ -0,0 +1,105 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.journal;
+
+
+import org.apache.directory.server.core.DirectoryService;
+import org.apache.directory.server.core.authn.LdapPrincipal;
+import org.apache.directory.shared.ldap.ldif.LdifEntry;
+
+
+/**
+ * A facade for the Journal subsystem.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public interface Journal
+{
+    /**
+     * Checks whether or not the Journal has been enabled.
+     *
+     * @return true if the Journal is logging changes, false otherwise
+     */
+    boolean isEnabled();
+
+
+    /**
+     * Enable or disable the Journal service
+     * @param enabled true to enable the service, false to disable it
+     */
+    void setEnabled( boolean enabled );
+    
+
+    /**
+     * @return The underlying storage
+     */
+    JournalStore getJournalStore();
+
+
+    /**
+     * Set the underlying storage
+     * @param store The storage
+     */
+    void setJournalStore( JournalStore store );
+
+
+    /**
+     * Records a change as an LDIF entry.
+     *
+     * @param principal the authorized LDAP principal triggering the change
+     * @param revision the operation revision
+     * @param forward LDIF of the change going to the next state
+     * @throws Exception if there are problems logging the change
+     */
+    void log( LdapPrincipal principal, long revision, LdifEntry entry ) throws Exception;
+
+    
+    /**
+     * Records a ack for a change
+     *
+     * @param revision The change revision which is acked
+     */
+    void ack( long revision );
+
+    
+    /**
+     * Records a nack for a change
+     *
+     * @param revision The change revision which is acked
+     */
+    void nack( long revision );
+
+    
+    /**
+     * Initialize the Journal.
+     * 
+     * @param service The associated DirectoryService
+     * @throws Exception If something went wrong 
+     */
+    void init( DirectoryService service ) throws Exception;
+
+    
+    /**
+     * Destroy the journal service
+     * @throws Exception If something went wrong
+     */
+    void destroy() throws Exception;
+}

Added: directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalEvent.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalEvent.java?rev=749816&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalEvent.java (added)
+++ directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalEvent.java Tue Mar  3 22:52:42 2009
@@ -0,0 +1,6 @@
+package org.apache.directory.server.core.journal;
+
+public class JournalEvent
+{
+
+}

Added: directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalInterceptor.java?rev=749816&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalInterceptor.java (added)
+++ directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalInterceptor.java Tue Mar  3 22:52:42 2009
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.directory.server.core.journal;
+
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.directory.server.core.DirectoryService;
+import org.apache.directory.server.core.entry.ServerAttribute;
+import org.apache.directory.server.core.entry.ServerEntry;
+import org.apache.directory.server.core.interceptor.BaseInterceptor;
+import org.apache.directory.server.core.interceptor.NextInterceptor;
+import org.apache.directory.server.core.interceptor.context.AddOperationContext;
+import org.apache.directory.server.core.interceptor.context.DeleteOperationContext;
+import org.apache.directory.server.core.interceptor.context.ModifyOperationContext;
+import org.apache.directory.server.core.interceptor.context.MoveAndRenameOperationContext;
+import org.apache.directory.server.core.interceptor.context.MoveOperationContext;
+import org.apache.directory.server.core.interceptor.context.RenameOperationContext;
+import org.apache.directory.shared.ldap.entry.Modification;
+import org.apache.directory.shared.ldap.ldif.ChangeType;
+import org.apache.directory.shared.ldap.ldif.LdifEntry;
+import org.apache.directory.shared.ldap.schema.AttributeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An interceptor which intercepts write operations to the directory and
+ * logs them into a journal.
+ * 
+ * @org.apache.xbean.XBean
+ */
+public class JournalInterceptor extends BaseInterceptor
+{
+    /** for debugging */
+    private static final Logger LOG = LoggerFactory.getLogger( JournalInterceptor.class );
+    
+    /** A flag set to true if the journal interceptor is enabled */
+    private boolean journalEnabled;
+    
+    /** A shared number stored within each change */ 
+    private AtomicLong revision;
+    
+    /** 
+     * A parameter indicating the number of operations stored in a journal
+     * before it is rotated. If set to 0, no rotation is done
+     */ 
+    private int rotation;
+    
+    /** the Journal service to log changes to */
+    private Journal journal;
+    
+
+    // -----------------------------------------------------------------------
+    // Overridden init() and destroy() methods
+    // -----------------------------------------------------------------------
+    /**
+     * The init method will initialize the local variables and load the 
+     * entryDeleted AttributeType.
+     */
+    public void init( DirectoryService directoryService ) throws Exception
+    {
+        super.init( directoryService );
+        
+        //if ( directoryService.getJournal().isEnabled() )
+        {
+            journalEnabled = true; 
+            journal = directoryService.getJournal();
+            revision = new AtomicLong( System.currentTimeMillis() );
+        }
+
+        LOG.debug( "JournalInterceptor has been initialized" );
+    }
+    
+    
+    /**
+     * Log the operation, manage the logs rotations.
+     */
+    private void log( long revision, LdifEntry ldif ) throws Exception
+    {
+        journal.log( getPrincipal(), revision, ldif );
+    }
+    
+    
+    // -----------------------------------------------------------------------
+    // Overridden (only change inducing) intercepted methods
+    // -----------------------------------------------------------------------
+    /**
+     * {@inheritDoc}
+     */
+    public void add( NextInterceptor next, AddOperationContext opContext ) throws Exception
+    {
+        long opRevision = 0;
+        
+        if ( journalEnabled )
+        {
+            opRevision = revision.incrementAndGet();
+            
+            // Store the added entry
+            ServerEntry addEntry = opContext.getEntry();
+
+            LdifEntry ldif = new LdifEntry();
+            ldif.setChangeType( ChangeType.Add );
+            ldif.setDn( opContext.getDn() );
+
+            Set<AttributeType> list = addEntry.getAttributeTypes();
+            
+            for ( AttributeType attributeType:list )
+            {
+                ldif.addAttribute( ((ServerAttribute)addEntry.get( attributeType) ).toClientAttribute() );
+            }
+            
+            log( opRevision, ldif );
+        }
+
+        try
+        {
+            next.add( opContext );
+
+            if ( journalEnabled )
+            {
+                // log the ACK
+                journal.ack( opRevision );
+            }
+        }
+        catch( Exception e )
+        {
+            if ( journalEnabled )
+            {
+                // log the NACK
+                journal.nack( opRevision );
+            }
+            throw e;
+        }
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void delete( NextInterceptor next, DeleteOperationContext opContext ) throws Exception
+    {
+        long opRevision = 0;
+        
+        if ( journalEnabled )
+        {
+            opRevision = revision.incrementAndGet();
+            
+            // Store the deleted entry
+            LdifEntry ldif = new LdifEntry();
+            ldif.setChangeType( ChangeType.Delete );
+            ldif.setDn( opContext.getDn() );
+            
+            journal.log( getPrincipal(), opRevision, ldif );
+        }
+
+        try
+        {
+            next.delete( opContext );
+
+            if ( journalEnabled )
+            {
+                // log the ACK
+                journal.ack( opRevision );
+            }
+        }
+        catch( Exception e )
+        {
+            if ( journalEnabled )
+            {
+                // log the NACK
+                journal.nack( opRevision );
+            }
+            throw e;
+        }
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void modify( NextInterceptor next, ModifyOperationContext opContext ) throws Exception
+    {
+        long opRevision = 0;
+        
+        if ( journalEnabled )
+        {
+            opRevision = revision.incrementAndGet();
+            
+            // Store the modified entry
+            LdifEntry ldif = new LdifEntry();
+            ldif.setChangeType( ChangeType.Modify );
+            ldif.setDn( opContext.getDn() );
+            
+            // Store the modifications 
+            for ( Modification modification:opContext.getModItems() )
+            {
+                ldif.addModificationItem( modification );
+            }
+            
+            journal.log( getPrincipal(), opRevision, ldif );
+        }
+        
+        try
+        {
+            next.modify( opContext );
+
+            if ( journalEnabled )
+            {
+                // log the ACK
+                journal.ack( opRevision );
+            }
+        }
+        catch( Exception e )
+        {
+            if ( journalEnabled )
+            {
+                // log the NACK
+                journal.nack( opRevision );
+            }
+            throw e;
+        }
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void rename ( NextInterceptor next, RenameOperationContext opContext ) throws Exception
+    {
+        long opRevision = 0;
+        
+        if ( journalEnabled )
+        {
+            opRevision = revision.incrementAndGet();
+            
+            // Store the renamed entry
+            LdifEntry ldif = new LdifEntry();
+            ldif.setChangeType( ChangeType.ModRdn );
+            ldif.setDn( opContext.getDn() );
+            ldif.setNewRdn( opContext.getNewRdn().toString() );
+            ldif.setDeleteOldRdn( opContext.getDelOldDn() );
+            
+            journal.log( getPrincipal(), opRevision, ldif );
+        }
+        
+        try
+        {
+            next.rename( opContext );
+    
+            if ( journalEnabled )
+            {
+                // log the ACK
+                journal.ack( opRevision );
+            }
+        }
+        catch( Exception e )
+        {
+            if ( journalEnabled )
+            {
+                // log the NACK
+                journal.nack( opRevision );
+            }
+            throw e;
+        }
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void moveAndRename( NextInterceptor next, MoveAndRenameOperationContext opContext )
+        throws Exception
+    {
+        long opRevision = 0;
+        
+        if ( journalEnabled )
+        {
+            opRevision = revision.incrementAndGet();
+            
+            // Store the renamed entry
+            LdifEntry ldif = new LdifEntry();
+            ldif.setChangeType( ChangeType.ModDn );
+            ldif.setDn( opContext.getDn() );
+            ldif.setNewRdn( opContext.getNewRdn().toString() );
+            ldif.setDeleteOldRdn( opContext.getDelOldDn() );
+            ldif.setNewSuperior( opContext.getNewDn().toString() );
+            
+            journal.log( getPrincipal(), opRevision, ldif );
+        }
+        
+        try
+        {
+            next.moveAndRename( opContext );
+            
+            if ( journalEnabled )
+            {
+                // log the ACK
+                journal.ack( opRevision );
+            }
+        }
+        catch( Exception e )
+        {
+            if ( journalEnabled )
+            {
+                // log the NACK
+                journal.nack( opRevision );
+            }
+            throw e;
+        }
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void move( NextInterceptor next, MoveOperationContext opContext ) throws Exception
+    {
+        long opRevision = 0;
+        
+        if ( journalEnabled )
+        {
+            opRevision = revision.incrementAndGet();
+            
+            // Store the moved entry
+            LdifEntry ldif = new LdifEntry();
+            ldif.setChangeType( ChangeType.ModDn );
+            ldif.setDn( opContext.getDn() );
+            ldif.setNewSuperior( opContext.getParent().toString() );
+            
+            journal.log( getPrincipal(), opRevision, ldif );
+        }
+        
+        try
+        {
+            next.move( opContext );
+            
+            if ( journalEnabled )
+            {
+                // log the ACK
+                journal.ack( opRevision );
+            }
+        }
+        catch( Exception e )
+        {
+            if ( journalEnabled )
+            {
+                // log the NACK
+                journal.nack( opRevision );
+            }
+            throw e;
+        }
+   }
+
+
+    /**
+     * @return the rotation
+     */
+    public int getRotation()
+    {
+        return rotation;
+    }
+
+
+    /**
+     * @param rotation the rotation to set
+     */
+    public void setRotation( int rotation )
+    {
+        this.rotation = rotation;
+    }
+}
\ No newline at end of file

Added: directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalStore.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalStore.java?rev=749816&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalStore.java (added)
+++ directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/journal/JournalStore.java Tue Mar  3 22:52:42 2009
@@ -0,0 +1,101 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.journal;
+
+
+import org.apache.directory.server.core.authn.LdapPrincipal;
+import org.apache.directory.server.core.DirectoryService;
+import org.apache.directory.shared.ldap.ldif.LdifEntry;
+
+
+
+/**
+ * A store for change events on the directory which exposes methods for 
+ * managing, querying and in general performing legal operations on the log.
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public interface JournalStore 
+{
+    /**
+     * Initialize the store.
+     * 
+     * @param service The associated DirectoryService
+     * @throws Exception If the initialization failed
+     */
+    void init( DirectoryService service ) throws Exception;
+
+
+    /**
+     * Write the changes on disk
+     * 
+     * @throws Exception If the write failed
+     */
+    void sync() throws Exception;
+
+
+    /**
+     * Destroy the logs. 
+     * 
+     * @throws Exception If we can't destroy the logs
+     */
+    void destroy() throws Exception;
+
+
+    /**
+     * Gets the current revision of the server (a.k.a. the HEAD revision).
+     *
+     * @return the current revision of the server
+     */
+    long getCurrentRevision();
+
+
+    /**
+     * Records a change as a forward LDIF and the authorized principal
+     *
+     * @param principal The principal who is logging the change
+     * @param revision The operation revision
+     * @param forward The change to log
+     * @return <code>true</code> if the entry has been written
+     */
+    boolean log( LdapPrincipal principal, long revision, LdifEntry forward );
+
+
+    /**
+     * Records a ack for a change
+     *
+     * @param revision The change revision which is acked
+     * @return <code>true</code> if the ack has been written
+     */
+    boolean ack( long revision );
+    
+    
+    /**
+     * Records a nack for a change
+     *
+     * @param revision The change revision which is nacked
+     * @return <code>true</code> if the nack has been written
+     * @throws Exception if there are problems logging the nack
+     */
+    boolean nack( long revision );
+}

Modified: directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/partition/ByPassConstants.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/partition/ByPassConstants.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/partition/ByPassConstants.java (original)
+++ directory/apacheds/branches/apacheds-replication/core/src/main/java/org/apache/directory/server/core/partition/ByPassConstants.java Tue Mar  3 22:52:42 2009
@@ -32,6 +32,7 @@
 import org.apache.directory.server.core.event.EventInterceptor;
 import org.apache.directory.server.core.exception.ExceptionInterceptor;
 import org.apache.directory.server.core.interceptor.Interceptor;
+import org.apache.directory.server.core.journal.JournalInterceptor;
 import org.apache.directory.server.core.normalization.NormalizationInterceptor;
 import org.apache.directory.server.core.operational.OperationalAttributeInterceptor;
 import org.apache.directory.server.core.schema.SchemaInterceptor;
@@ -98,6 +99,7 @@
         c.add( SchemaInterceptor.class.getName() );
         c.add( SubentryInterceptor.class.getName() );
         c.add( EventInterceptor.class.getName() );
+        c.add( JournalInterceptor.class.getName() );
         LOOKUP_BYPASS = Collections.unmodifiableCollection( c );
 
         c = new HashSet<String>();
@@ -110,6 +112,7 @@
         c.add( SchemaInterceptor.class.getName() );
         c.add( SubentryInterceptor.class.getName() );
         c.add( EventInterceptor.class.getName() );
+        c.add( JournalInterceptor.class.getName() );
         HAS_ENTRY_BYPASS = Collections.unmodifiableCollection( c );
 
         c = new HashSet<String>();
@@ -122,6 +125,7 @@
         c.add( SchemaInterceptor.class.getName() );
         c.add( SubentryInterceptor.class.getName() );
         c.add( EventInterceptor.class.getName() );
+        c.add( JournalInterceptor.class.getName() );
         LOOKUP_COLLECTIVE_BYPASS = Collections.unmodifiableCollection( c );
 
         c = new HashSet<String>();
@@ -132,6 +136,7 @@
         c.add( OperationalAttributeInterceptor.class.getName() );
         c.add( SubentryInterceptor.class.getName() );
         c.add( EventInterceptor.class.getName() );
+        c.add( JournalInterceptor.class.getName() );
         GETMATCHEDDN_BYPASS = Collections.unmodifiableCollection( c );
 
         c = new HashSet<String>();
@@ -143,6 +148,7 @@
         c.add( SubentryInterceptor.class.getName() );
         c.add( EventInterceptor.class.getName() );
         c.add( TriggerInterceptor.class.getName() );
+        c.add( JournalInterceptor.class.getName() );
         LOOKUP_EXCLUDING_OPR_ATTRS_BYPASS = Collections.unmodifiableCollection( c );
         
         c = new HashSet<String>();
@@ -157,6 +163,7 @@
         c.add( CollectiveAttributeInterceptor.class.getName() );
         c.add( EventInterceptor.class.getName() );
         c.add( TriggerInterceptor.class.getName() );
+        c.add( JournalInterceptor.class.getName() );
         GET_ROOT_DSE_BYPASS = Collections.unmodifiableCollection( c );
     }
 }

Modified: directory/apacheds/branches/apacheds-replication/core/src/test/java/org/apache/directory/server/core/authz/support/MaxImmSubFilterTest.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/core/src/test/java/org/apache/directory/server/core/authz/support/MaxImmSubFilterTest.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/core/src/test/java/org/apache/directory/server/core/authz/support/MaxImmSubFilterTest.java (original)
+++ directory/apacheds/branches/apacheds-replication/core/src/test/java/org/apache/directory/server/core/authz/support/MaxImmSubFilterTest.java Tue Mar  3 22:52:42 2009
@@ -59,6 +59,7 @@
 import org.apache.directory.server.core.interceptor.context.RenameOperationContext;
 import org.apache.directory.server.core.interceptor.context.SearchOperationContext;
 import org.apache.directory.server.core.interceptor.context.UnbindOperationContext;
+import org.apache.directory.server.core.journal.Journal;
 import org.apache.directory.server.core.partition.Partition;
 import org.apache.directory.server.core.partition.PartitionNexus;
 import org.apache.directory.server.core.schema.SchemaOperationControl;
@@ -762,6 +763,12 @@
         }
 
 
+        public Journal getJournal()
+        {
+            return null;
+        }
+
+
         public ServerEntry newEntry( LdapDN dn ) throws NamingException
         {
             return null;

Modified: directory/apacheds/branches/apacheds-replication/core/src/test/java/org/apache/directory/server/core/interceptor/InterceptorChainTest.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/core/src/test/java/org/apache/directory/server/core/interceptor/InterceptorChainTest.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/core/src/test/java/org/apache/directory/server/core/interceptor/InterceptorChainTest.java (original)
+++ directory/apacheds/branches/apacheds-replication/core/src/test/java/org/apache/directory/server/core/interceptor/InterceptorChainTest.java Tue Mar  3 22:52:42 2009
@@ -34,6 +34,7 @@
 import org.apache.directory.server.core.event.EventService;
 import org.apache.directory.server.core.interceptor.context.LookupOperationContext;
 import org.apache.directory.server.core.invocation.InvocationStack;
+import org.apache.directory.server.core.journal.Journal;
 import org.apache.directory.server.core.partition.ByPassConstants;
 import org.apache.directory.server.core.partition.Partition;
 import org.apache.directory.server.core.partition.PartitionNexus;
@@ -555,6 +556,7 @@
         {
             
         }
+
         
         public ChangeLog getChangeLog()
         {
@@ -562,6 +564,12 @@
         }
 
 
+        public Journal getJournal()
+        {
+            return null;
+        }
+
+
         public ServerEntry newEntry( LdapDN dn ) throws NamingException
         {
             return null;

Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/common/CSN.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/common/CSN.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/common/CSN.java (original)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/common/CSN.java Tue Mar  3 22:52:42 2009
@@ -21,6 +21,13 @@
 
 
 import java.io.Serializable;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.directory.shared.ldap.util.StringTools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -29,16 +36,15 @@
  * A CSN is a composition of a timestamp, a replica ID and a 
  * operation sequence number.
  * 
- * It's described in http://tools.ietf.org/html/draft-sermersheim-ldap-csn-02.
+ * It's described in http://tools.ietf.org/html/draft-ietf-ldup-model-09.
  * 
- * The ASN.1 syntax is :
+ * The CSN syntax is :
  * <pre>
- * ChangeSequenceNumber ::= SEQUENCE {
- *     time GeneralizedTime,
- *     timeCount INTEGER (0 ..  MaxInt),
- *     replicaID UTF8String,
- *     changeCount INTEGER (0 ..  MaxInt)
- * }
+ * <CSN>            ::= <timestamp> # <changeCount> # <replicaId> # <modifierNumber>
+ * <timestamp>      ::= A GMT based time, YYYYMMDDhh:mm:ssz
+ * <changeCount>    ::= [0-9a-zA-Z]+
+ * <replicaId>      ::= IA5String
+ * <modifierNumber> ::= [0-9a-zA-Z]+
  * </pre>
  *  
  * It distinguishes a change made on an object on a server,
@@ -58,14 +64,20 @@
      */
     private static final long serialVersionUID = 1L;
 
+    /** The logger for this class */
+    private static final Logger LOG = LoggerFactory.getLogger( CSN.class );
+
     /** The timeStamp of this operation */
     private final long timestamp;
 
     /** The server identification */
     private final String replicaId;
 
-    /** The operation number in the same timestamp */
-    private final int operationSequence;
+    /** The operation number in a modification operation */
+    private final int operationNumber;
+    
+    /** The changeCount to distinguish operations done in the same second */
+    private final int changeCount;  
 
     /** Stores the String representation of the CSN */
     private transient String csnStr;
@@ -73,20 +85,25 @@
     /** Stores the byte array representation of the CSN */
     private transient byte[] bytes;
 
+    /** The Timestamp syntax. The last 'z' is _not_ the Time Zone */
+    private static final SimpleDateFormat sdf = new SimpleDateFormat( "yyyyMMddHH:mm:ss'z'" );
+
 
     /**
      * Creates a new instance.
      * <b>This method should be used only for deserializing a CSN</b> 
      * 
      * @param timestamp GMT timestamp of modification
+     * @param changeCount The operation increment
      * @param replicaId Replica ID where modification occurred (<tt>[-_A-Za-z0-9]{1,16}</tt>)
-     * @param operationSequence Operation sequence
+     * @param operationNumber Operation number in a modification operation
      */
-    public CSN( long timestamp, String replicaId, int operationSequence )
+    public CSN( long timestamp, int changeCount, String replicaId, int operationNumber )
     {
         this.timestamp = timestamp;
         this.replicaId = replicaId;
-        this.operationSequence = operationSequence;
+        this.operationNumber = operationNumber;
+        this.changeCount = changeCount;
     }
 
 
@@ -94,51 +111,131 @@
      * Creates a new instance of SimpleCSN from a String.
      * 
      * The string format must be :
-     * &lt;timestamp> : &lt;replica ID> : &lt;operation sequence>
+     * &lt;timestamp> # &lt;changeCount> # &lt;replica ID> # &lt;operation number>
      *
      * @param value The String containing the CSN
      */
     public CSN( String value ) throws InvalidCSNException
     {
-        assert value != null;
-
-        int sepTS = value.indexOf( ':' );
-
-        assert sepTS > 0;
+        if ( StringTools.isEmpty( value ) )
+        {
+            String message = "The CSN must not be null or empty";
+            LOG.error( message );
+            throw new InvalidCSNException( message );
+        }
 
-        int sepID = value.lastIndexOf( ':' );
+        // Get the Timestamp
+        int sepTS = value.indexOf( '#' );
+        
+        if ( sepTS < 0 )
+        {
+            String message = "Cannot find a '#' in the CSN '" + value + "'";
+            LOG.error( message );
+            throw new InvalidCSNException( message );
+        }
+        
+        String timestampStr = value.substring( 0, sepTS ).trim();
+        
+        synchronized ( sdf )
+        {
+            try
+            {
+                timestamp = sdf.parse( timestampStr ).getTime();
+            }
+            catch ( ParseException pe )
+            {
+                String message = "Cannot parse the timestamp: '" + timestampStr + "'";
+                LOG.error( message );
+                throw new InvalidCSNException( message );
+            }
+        }
 
-        if ( ( sepID == -1 ) || ( sepID == sepTS ) | ( sepID - sepTS < 2 ) )
+        // Get the changeCount. It should be an hex number prefixed with '0x'
+        int sepCC = value.indexOf( '#', sepTS + 1 );
+        
+        if ( sepCC < 0 )
         {
-            throw new InvalidCSNException();
+            String message = "Missing a '#' in the CSN '" + value + "'";
+            LOG.error( message );
+            throw new InvalidCSNException( message );
         }
 
+        String changeCountStr = value.substring( sepTS + 1, sepCC ).trim();
+        
+        if ( !changeCountStr.startsWith( "0x" ) )
+        {
+            String message = "The changeCount '" + changeCountStr + "' is not a valid number";
+            LOG.error( message );
+            throw new InvalidCSNException( message );
+        }
+        
         try
         {
-            timestamp = Long.parseLong( value.substring( 0, sepTS ), 16 );
+            changeCount = Integer.parseInt( changeCountStr.substring( 2 ), 16 ); 
+        }
+        catch ( NumberFormatException nfe )
+        {
+            String message = "The changeCount '" + changeCountStr + "' is not a valid number";
+            LOG.error( message );
+            throw new InvalidCSNException( message );
         }
-        catch ( NumberFormatException ife )
+        
+        // Get the replicaID
+        int sepRI = value.indexOf( '#', sepCC + 1 );
+        
+        if ( sepRI < 0 )
         {
-            throw new InvalidCSNException();
+            String message = "Missing a '#' in the CSN '" + value + "'";
+            LOG.error( message );
+            throw new InvalidCSNException( message );
         }
 
-        try
+        replicaId = value.substring( sepCC + 1, sepRI).trim();
+        
+        if ( StringTools.isEmpty( replicaId ) )
+        {
+            String message = "The replicaID must not be null or empty";
+            LOG.error( message );
+            throw new InvalidCSNException( message );
+        }
+        
+        if ( !StringTools.isIA5String( replicaId ) )
         {
-            replicaId = value.substring( sepTS + 1, sepID );
+            String message = "The replicaID must contains only alphanumeric characters";
+            LOG.error( message );
+            throw new InvalidCSNException( message );
         }
-        catch ( IllegalArgumentException iae )
+        
+        // Get the modification number
+        if ( sepCC == value.length() )
         {
-            throw new InvalidCSNException();
+            String message = "The operationNumber is absent";
+            LOG.error( message );
+            throw new InvalidCSNException( message );
         }
-
+        
+        String operationNumberStr = value.substring( sepRI + 1 ).trim();
+        
+        if ( !operationNumberStr.startsWith( "0x" ) )
+        {
+            String message = "The operationNumber '" + operationNumberStr + "' is not a valid number";
+            LOG.error( message );
+            throw new InvalidCSNException( message );
+        }
+        
         try
         {
-            operationSequence = Integer.parseInt( value.substring( sepID + 1 ), 16 );
+            operationNumber = Integer.parseInt( operationNumberStr.substring( 2 ), 16 ); 
         }
-        catch ( NumberFormatException ife )
+        catch ( NumberFormatException nfe )
         {
-            throw new InvalidCSNException();
+            String message = "The operationNumber '" + operationNumberStr + "' is not a valid number";
+            LOG.error( message );
+            throw new InvalidCSNException( message );
         }
+        
+        csnStr = value;
+        bytes = toBytes();
     }
 
 
@@ -149,53 +246,20 @@
      */
     /** Package protected */ CSN( byte[] value )
     {
-        timestamp = ( ( long ) ( value[0] & 0x00FF ) << 56 ) | ( ( long ) ( value[1] & 0x00FF ) << 48 )
-            | ( ( long ) ( value[2] & 0x00FF ) << 40 ) | ( ( long ) ( value[3] & 0x00FF ) << 32 )
-            | ( ( value[4] << 24 ) & 0x00000000FF000000L ) | ( ( value[5] << 16 ) & 0x0000000000FF0000L )
-            | ( ( value[6] << 8 ) & 0x000000000000FF00L ) | ( value[7] & 0x00000000000000FFL );
-
-        operationSequence = ( ( value[8] & 0x00FF ) << 24 ) + ( ( value[9] & 0x00FF ) << 16 )
-            + ( ( value[10] & 0x00FF ) << 8 ) + ( value[11] & 0x00FF );
-
-        char[] chars = new char[value.length - 12];
-
-        for ( int i = 12; i < value.length; i++ )
-        {
-            chars[i - 12] = ( char ) ( value[i] & 0x00FF );
-        }
-
-        replicaId = new String( chars );
-        bytes = value;
-    }
-
-
-    /**
-     * Return the CSN as a formated string. The used format is :
-     * &lt;timestamp> ':' &lt;replicaId> ':' &lt;operation sequence>
-     * 
-     * @return The CSN as a String
-     */
-    public String toOctetString()
-    {
-        if ( csnStr == null )
-        {
-            StringBuilder buf = new StringBuilder( 40 );
-            buf.append( timestamp );
-            buf.append( ':' );
-            buf.append( replicaId );
-            buf.append( ':' );
-            buf.append( operationSequence );
-            csnStr = buf.toString();
-        }
-
-        return csnStr;
+        csnStr = StringTools.utf8ToString( value );
+        CSN csn = new CSN( csnStr );
+        timestamp = csn.timestamp;
+        changeCount = csn.changeCount;
+        replicaId = csn.replicaId;
+        operationNumber = csn.operationNumber;
+        bytes = toBytes();
     }
 
 
     /**
      * Get the CSN as a byte array. The data are stored as :
      * bytes 1 to 8  : timestamp, big-endian
-     * bytes 9 to 12 : operation sequence, big-endian
+     * bytes 9 to 12 : change count, big endian
      * bytes 13 to ... : ReplicaId 
      * 
      * @return A byte array representing theCSN
@@ -204,28 +268,7 @@
     {
         if ( bytes == null )
         {
-            String id = replicaId;
-            byte[] bb = new byte[8 + id.length() + 4];
-
-            bb[0] = ( byte ) ( timestamp >> 56 );
-            bb[1] = ( byte ) ( timestamp >> 48 );
-            bb[2] = ( byte ) ( timestamp >> 40 );
-            bb[3] = ( byte ) ( timestamp >> 32 );
-            bb[4] = ( byte ) ( timestamp >> 24 );
-            bb[5] = ( byte ) ( timestamp >> 16 );
-            bb[6] = ( byte ) ( timestamp >> 8 );
-            bb[7] = ( byte ) timestamp;
-            bb[8] = ( byte ) ( ( operationSequence >> 24 ) );
-            bb[9] = ( byte ) ( ( operationSequence >> 16 ) );
-            bb[10] = ( byte ) ( ( operationSequence >> 8 ) );
-            bb[11] = ( byte ) ( operationSequence );
-
-            for ( int i = 0; i < id.length(); i++ )
-            {
-                bb[12 + i] = ( byte ) id.charAt( i );
-            }
-
-            bytes = bb;
+            bytes = StringTools.getBytesUtf8( csnStr );
         }
 
         return bytes;
@@ -242,6 +285,15 @@
 
 
     /**
+     * @return The changeCount
+     */
+    public int getChangeCount()
+    {
+        return changeCount;
+    }
+
+
+    /**
      * @return The replicaId
      */
     public String getReplicaId()
@@ -251,11 +303,11 @@
 
 
     /**
-     * @return The operation sequence
+     * @return The operation number
      */
-    public int getOperationSequence()
+    public int getOperationNumber()
     {
-        return operationSequence;
+        return operationNumber;
     }
 
 
@@ -264,7 +316,25 @@
      */
     public String toString()
     {
-        return toOctetString();
+        if ( csnStr == null )
+        {
+            StringBuilder buf = new StringBuilder( 40 );
+            
+            synchronized( sdf )
+            {
+                buf.append( sdf.format( new Date( timestamp ) ) );
+            }
+            
+            buf.append( '#' );
+            buf.append( "0x" ).append( Integer.toHexString( changeCount ) );
+            buf.append( '#' );
+            buf.append( replicaId );
+            buf.append( '#' );
+            buf.append( "0x" ).append( Integer.toHexString( operationNumber ) );
+            csnStr = buf.toString();
+        }
+        
+        return csnStr;
     }
 
 
@@ -275,7 +345,14 @@
      */
     public int hashCode()
     {
-        return replicaId.hashCode() ^ ( int ) timestamp ^ operationSequence;
+        int h = 37;
+        
+        h = h*17 + (int)(timestamp ^ (timestamp >>> 32));
+        h = h*17 + changeCount;
+        h = h*17 + replicaId.hashCode();
+        h = h*17 + operationNumber;
+        
+        return h;
     }
 
 
@@ -300,8 +377,11 @@
 
         CSN that = ( CSN ) o;
 
-        return ( timestamp == that.getTimestamp() ) && ( replicaId.equals( that.getReplicaId() ) )
-            && ( operationSequence == that.getOperationSequence() );
+        return 
+            ( timestamp == that.timestamp ) &&
+            ( changeCount == that.changeCount ) &&
+            ( replicaId.equals( that.replicaId ) ) &&
+            ( operationNumber == that.operationNumber );
     }
 
 
@@ -321,31 +401,40 @@
             return 1;
         }
         
-        long thatTimestamp = csn.getTimestamp();
+        // Compares the timestamp first
+        if ( this.timestamp < csn.timestamp )
+        {
+            return -1;
+        }
+        else if ( this.timestamp > csn.timestamp )
+        {
+            return 1;
+        }
 
-        if ( this.timestamp < thatTimestamp )
+        // Then the change count
+        if ( this.changeCount < csn.changeCount )
         {
             return -1;
         }
-        else if ( this.timestamp > thatTimestamp )
+        else if ( this.changeCount > csn.changeCount )
         {
             return 1;
         }
 
-        int replicaIdCompareResult = this.replicaId.compareTo( csn.getReplicaId() );
+        // Then the replicaId
+        int replicaIdCompareResult = this.replicaId.compareTo( csn.replicaId );
 
         if ( replicaIdCompareResult != 0 )
         {
             return replicaIdCompareResult;
         }
 
-        int thatSequence = csn.getOperationSequence();
-
-        if ( this.operationSequence < thatSequence )
+        // Last, not least, compares the operation number
+        if ( this.operationNumber < csn.operationNumber )
         {
             return -1;
         }
-        else if ( this.operationSequence > thatSequence )
+        else if ( this.operationNumber > csn.operationNumber )
         {
             return 1;
         }

Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/common/CSNFactory.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/common/CSNFactory.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/common/CSNFactory.java (original)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/common/CSNFactory.java Tue Mar  3 22:52:42 2009
@@ -31,12 +31,12 @@
     private static volatile long lastTimestamp;
     
     /** The integer used to disambiguate CSN generated at the same time */
-    private static volatile int operationSequence;
+    private static volatile int changeCount;
 
 
     public CSNFactory()
     {
-        operationSequence = 0;
+        changeCount = 0;
     }
 
 
@@ -55,20 +55,18 @@
         // We will be able to generate 2 147 483 647 CSNs each 10 ms max
         if ( lastTimestamp == newTimestamp )
         {
-            operationSequence ++;
+            changeCount ++;
         }
         else
         {
             lastTimestamp = newTimestamp;
-            operationSequence = 0;
+            changeCount = 0;
         }
 
-        return new CSN( lastTimestamp, replicaId, operationSequence );
+        return new CSN( lastTimestamp, changeCount, replicaId, 0 );
     }
 
 
-
-
     /**
      * Returns a new {@link CSN} created from the given values.
      * 
@@ -77,12 +75,14 @@
      * @param timestamp The timestamp to use
      * @param replicaId Replica ID.  ReplicaID must be 1-8 digit alphanumeric
      * string.
-     * @param operationSequence The operation sequence to use
+     * @param changeCount The change count to use
      */
-    public CSN newInstance( long timestamp, String replicaId, int operationSequence )
+    public CSN newInstance( long timestamp, String replicaId, int changeCount )
     {
-        return new CSN( timestamp, replicaId, operationSequence );
+        return new CSN( timestamp, changeCount, replicaId, 0 );
     }
+    
+    
     /**
      * Generates a CSN used to purge data. Its replicaID is not associated
      * to a server. 
@@ -91,6 +91,6 @@
      */
     public CSN newInstance( long expirationDate )
     {
-        return new CSN( expirationDate, "ZZZZZZZZZZZZZZZZ", Integer.MAX_VALUE );
+        return new CSN( expirationDate, Integer.MAX_VALUE, "ZZZZZZZZZZZZZZZZ", Integer.MAX_VALUE );
     }
 }

Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationFactory.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationFactory.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationFactory.java (original)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/operation/OperationFactory.java Tue Mar  3 22:52:42 2009
@@ -142,7 +142,7 @@
         // NOTE: We inlined addDefaultOperations() because ApacheDS currently
         // creates an index entry only for ADD operation (and not for
         // MODIFY operation)
-        cloneEntry.put( ApacheSchemaConstants.ENTRY_CSN_AT, csn.toOctetString() );
+        cloneEntry.put( ApacheSchemaConstants.ENTRY_CSN_AT, csn.toBytes() );
 
         return new AddEntryOperation( registries, csn, cloneEntry );
     }
@@ -419,7 +419,7 @@
                 new DefaultServerAttribute( 
                     ApacheSchemaConstants.ENTRY_DELETED_AT,
                     attributeRegistry.lookup( ApacheSchemaConstants.ENTRY_CSN_AT ),
-                    csn.toOctetString() ) ) );
+                    csn.toBytes() ) ) );
 
         return result;
     }

Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java (original)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/ClientConnectionManager.java Tue Mar  3 22:52:42 2009
@@ -200,6 +200,7 @@
             while ( !timeToShutdown )
             {
                 connectUnconnected();
+                
                 try
                 {
                     Thread.sleep( 1000 );
@@ -221,6 +222,7 @@
                 // Someone might have modified the configuration,
                 // and therefore we try to detect newly added replicas.
                 Connection con = sessions.get( replica.getId() );
+                
                 if ( con == null )
                 {
                     con = new Connection();
@@ -260,6 +262,7 @@
                     else
                     {
                         con.delay *= 2;
+                        
                         if ( con.delay > 60 )
                         {
                             con.delay = 60;
@@ -268,10 +271,12 @@
                 }
 
                 Connector connector = new Connector( replica, con );
+                
                 synchronized ( con )
                 {
                     con.connector = connector;
                 }
+                
                 connector.start();
             }
         }
@@ -345,6 +350,9 @@
 
         public void run()
         {
+            System.out.println( System.currentTimeMillis() + "[Replica-" + ClientConnectionManager.this.configuration.getReplicaId() + 
+                "] Connecting to replica-" + replica.getId() +", delay = " + con.delay );
+            
             if ( con.delay > 0 )
             {
                 if ( LOG.isInfoEnabled() )
@@ -369,12 +377,22 @@
             
             try
             {
-                connector.setConnectTimeoutMillis( configuration.getResponseTimeout() );
-                connector.setHandler( new ReplicationClientProtocolHandler( interceptor )  );
+                if ( !connector.isActive() )
+                {
+                    System.out.println( System.currentTimeMillis() + 
+                        "[Replica-" + ClientConnectionManager.this.configuration.getReplicaId() + 
+                        "] Connection to "+ replica.getId() + ", Connector inactive --> Activating it" );
+                    connector.setConnectTimeoutMillis( configuration.getResponseTimeout() );
+                    connector.setHandler( new ReplicationClientProtocolHandler( interceptor )  );
+                }
+                
                 ConnectFuture future = connector.connect( replica.getAddress() );
 
                 future.awaitUninterruptibly();
                 session = future.getSession();
+                
+                System.out.println( System.currentTimeMillis() + "[Replica-" + ClientConnectionManager.this.configuration.getReplicaId()
+                    + "] is successfully connected to replica-" + replica.getId());
 
                 synchronized ( con )
                 {
@@ -386,8 +404,10 @@
             }
             catch ( RuntimeIoException e )
             {
-                LOG.warn( "[Replica-" + ClientConnectionManager.this.configuration.getReplicaId()
-                        + "] Failed to connect to replica-" + replica.getId(), e );
+                String message = System.currentTimeMillis() + "[Replica-" + ClientConnectionManager.this.configuration.getReplicaId()
+                + "] Failed to connect to replica-" + replica.getId();
+                System.out.println( message + ", message = " + e.getMessage() );
+                LOG.warn( message, e );
             }
             finally
             {

Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationInterceptor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationInterceptor.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationInterceptor.java (original)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/ReplicationInterceptor.java Tue Mar  3 22:52:42 2009
@@ -361,7 +361,7 @@
 
         try
         {
-            filter = FilterParser.parse( "(&(" + ENTRY_CSN_OID + "<=" + purgeCSN.toOctetString() + ")(" + ENTRY_DELETED_OID
+            filter = FilterParser.parse( "(&(" + ENTRY_CSN_OID + "<=" + purgeCSN.toBytes() + ")(" + ENTRY_DELETED_OID
                 + "=TRUE))" );
         }
         catch ( ParseException e )

Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BeginLogEntriesAckMessageDecoder.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BeginLogEntriesAckMessageDecoder.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BeginLogEntriesAckMessageDecoder.java (original)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BeginLogEntriesAckMessageDecoder.java Tue Mar  3 22:52:42 2009
@@ -87,7 +87,7 @@
                 throw new ProtocolDecoderException( "Invalid replicaId", e );
             }
 
-            updateVector.setCSN( new CSN( in.getLong(), replicaId, in.getInt() ) );
+            updateVector.setCSN( new CSN( in.getLong(), in.getInt(), replicaId, in.getInt() ) );
         }
     }
 

Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BeginLogEntriesAckMessageEncoder.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BeginLogEntriesAckMessageEncoder.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BeginLogEntriesAckMessageEncoder.java (original)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/codec/BeginLogEntriesAckMessageEncoder.java Tue Mar  3 22:52:42 2009
@@ -77,7 +77,7 @@
                 out.putString( replicaId, utf8encoder );
                 out.put( ( byte ) 0x00 );
                 out.putLong( csn.getTimestamp() );
-                out.putInt( csn.getOperationSequence() );
+                out.putInt( csn.getChangeCount() );
             }
             catch ( CharacterCodingException e )
             {

Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java (original)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationClientContextHandler.java Tue Mar  3 22:52:42 2009
@@ -45,6 +45,7 @@
 import org.apache.directory.server.core.CoreSession;
 import org.apache.directory.server.core.DefaultCoreSession;
 import org.apache.directory.server.core.authn.LdapPrincipal;
+import org.apache.directory.server.core.entry.ServerBinaryValue;
 import org.apache.directory.server.core.entry.ServerEntry;
 import org.apache.directory.server.core.filtering.EntryFilteringCursor;
 import org.apache.directory.server.core.interceptor.context.SearchOperationContext;
@@ -413,20 +414,13 @@
                 }
 
                 // Get entryCSN of the entry.  Skip if entryCSN value is invalid. 
-                CSN csn;
+                CSN csn = null;
 
                 try
                 {
-                    Object val = entryCSNAttr.get();
-
-                    if ( val instanceof byte[] )
-                    {
-                        csn = new CSN( StringTools.utf8ToString( ( byte[] ) val ) );
-                    }
-                    else
-                    {
-                        csn = new CSN( ( String ) val );
-                    }
+                    byte[] csnBytes = entryCSNAttr.getBytes();
+                    
+                    csn = new CSN( StringTools.utf8ToString( csnBytes ) );
                 }
                 catch ( IllegalArgumentException ex )
                 {

Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerContextHandler.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerContextHandler.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerContextHandler.java (original)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/service/protocol/handler/ReplicationServerContextHandler.java Tue Mar  3 22:52:42 2009
@@ -85,6 +85,8 @@
 
     public void messageReceived( ReplicationContext ctx, Object message ) throws Exception
     {
+        System.out.println( "Message received : " + message );
+        
         if ( ctx.getState() == State.READY )
         {
             if ( message instanceof LogEntryMessage )

Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/store/derby/DerbyReplicationLogIterator.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/store/derby/DerbyReplicationLogIterator.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/store/derby/DerbyReplicationLogIterator.java (original)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/store/derby/DerbyReplicationLogIterator.java Tue Mar  3 22:52:42 2009
@@ -26,7 +26,6 @@
 import java.sql.Statement;
 
 import org.apache.directory.mitosis.common.CSN;
-import org.apache.directory.mitosis.common.CSN;
 import org.apache.directory.mitosis.operation.Operation;
 import org.apache.directory.mitosis.operation.OperationCodec;
 import org.apache.directory.mitosis.store.ReplicationLogIterator;
@@ -76,8 +75,8 @@
         {
             String replicaId = rs.getString( 1 );
             long timestamp = rs.getLong( 2 );
-            int operationSequence = rs.getInt( 3 );
-            return new CSN( timestamp, replicaId, operationSequence );
+            int changeCount = rs.getInt( 3 );
+            return new CSN( timestamp, changeCount, replicaId, 0 );
         }
         catch ( Exception e )
         {

Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/store/derby/DerbyReplicationStore.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/store/derby/DerbyReplicationStore.java?rev=749816&r1=749815&r2=749816&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/store/derby/DerbyReplicationStore.java (original)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/store/derby/DerbyReplicationStore.java Tue Mar  3 22:52:42 2009
@@ -452,7 +452,7 @@
                 + " (CSN_REPLICA_ID, CSN_TIMESTAMP, CSN_OP_SEQ, OPERATION) VALUES(?,?,?,?)" );
             ps.setString( 1, csn.getReplicaId() );
             ps.setLong( 2, csn.getTimestamp() );
-            ps.setInt( 3, csn.getOperationSequence() );
+            ps.setInt( 3, csn.getChangeCount() );
             ps.setBytes( 4, encodedOp );
             if ( ps.executeUpdate() != 1 )
             {
@@ -536,7 +536,7 @@
                 CSN csn = updateVector.getCSN( replicaId );
                 ps.setString( paramIdx++, replicaId );
                 ps.setLong( paramIdx++, csn.getTimestamp() );
-                ps.setInt( paramIdx++, csn.getOperationSequence() );
+                ps.setInt( paramIdx++, csn.getChangeCount() );
                 ps.setLong( paramIdx++, csn.getTimestamp() );
             }
             
@@ -559,7 +559,7 @@
         {
             for ( String knownReplicaId : knownReplicaIds )
             {
-                newUV.setCSN( new CSN( 0, knownReplicaId, 0 ) );
+                newUV.setCSN( new CSN( 0, 0, knownReplicaId, 0 ) );
             }
         }
 
@@ -589,7 +589,7 @@
                     + "ORDER BY CSN_TIMESTAMP ASC, CSN_OP_SEQ ASC" );
             ps.setString( 1, fromCSN.getReplicaId() );
             ps.setLong( 2, fromCSN.getTimestamp() );
-            ps.setInt( 3, fromCSN.getOperationSequence() );
+            ps.setInt( 3, fromCSN.getChangeCount() );
             ps.setLong( 4, fromCSN.getTimestamp() );
             rs = ps.executeQuery();
 
@@ -620,7 +620,7 @@
                 + " ? OR CSN_TIMESTAMP < ?)" );
             ps.setString( 1, toCSN.getReplicaId() );
             ps.setLong( 2, toCSN.getTimestamp() );
-            ps.setInt( 3, toCSN.getOperationSequence() );
+            ps.setInt( 3, toCSN.getChangeCount() );
             ps.setLong( 4, toCSN.getTimestamp() );
             return ps.executeUpdate();
         }
@@ -726,7 +726,7 @@
                 
                 if ( rs.next() )
                 {
-                    result.setCSN( new CSN( rs.getLong( 1 ), replicaId, rs.getInt( 2 ) ) );
+                    result.setCSN( new CSN( rs.getLong( 1 ), rs.getInt( 2 ), replicaId, 0 ) );
                 }
                 
                 rs.close();



Mime
View raw message