directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kayyag...@apache.org
Subject svn commit: r947278 [1/2] - in /directory/apacheds/trunk/protocol-ldap: ./ src/main/java/org/apache/directory/server/ldap/replication/
Date Sat, 22 May 2010 10:26:50 GMT
Author: kayyagari
Date: Sat May 22 10:26:50 2010
New Revision: 947278

URL: http://svn.apache.org/viewvc?rev=947278&view=rev
Log:
o injecting the synrepl based replication provider implementation
o added dependency on activemq-core to use its queues for storing the operations performed on DIT as event messages in chronological order

Added:
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaDitStoreUtil.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLogCursor.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplProvider.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java
Modified:
    directory/apacheds/trunk/protocol-ldap/pom.xml

Modified: directory/apacheds/trunk/protocol-ldap/pom.xml
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/pom.xml?rev=947278&r1=947277&r2=947278&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/pom.xml (original)
+++ directory/apacheds/trunk/protocol-ldap/pom.xml Sat May 22 10:26:50 2010
@@ -53,6 +53,45 @@
       <groupId>${project.groupId}</groupId>
       <version>${project.version}</version>
     </dependency>
+
+    <!-- just have the dependency on activemq core components alone excluding
+         the spring and OSGi related jars, cause we use *only* the QueueS nothing else -->
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-core</artifactId>
+      <version>5.3.1</version>
+      <exclusions>
+       <exclusion>
+        <groupId>org.springframework</groupId>
+        <artifactId>spring-context</artifactId>
+       </exclusion>
+       <exclusion>
+        <groupId>org.springframework.osgi</groupId>
+        <artifactId>spring-osgi-core</artifactId>
+       </exclusion>
+       <exclusion>
+        <groupId>org.osgi</groupId>
+        <artifactId>org.osgi.core</artifactId>
+       </exclusion>
+       <exclusion>
+        <groupId>avalon-framework</groupId>
+        <artifactId>avalon-framework</artifactId>
+       </exclusion>
+       <exclusion>
+        <groupId>logkit</groupId>
+        <artifactId>logkit</artifactId>
+       </exclusion>
+       <exclusion>
+        <groupId>javax.servlet</groupId>
+        <artifactId>servlet-api</artifactId>
+       </exclusion>
+       <exclusion>
+        <groupId>org.apache.geronimo.specs</groupId>
+        <artifactId>geronimo-j2ee-management_1.0_spec</artifactId>
+       </exclusion>
+      </exclusions>
+    </dependency>
+    
   </dependencies>
 
   <build>

Added: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaDitStoreUtil.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaDitStoreUtil.java?rev=947278&view=auto
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaDitStoreUtil.java (added)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaDitStoreUtil.java Sat May 22 10:26:50 2010
@@ -0,0 +1,202 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+
+package org.apache.directory.server.ldap.replication;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.directory.ldap.client.api.message.SearchResponse;
+import org.apache.directory.ldap.client.api.message.SearchResultEntry;
+import org.apache.directory.server.core.CoreSession;
+import org.apache.directory.server.core.DirectoryService;
+import org.apache.directory.server.core.LdapCoreSessionConnection;
+import org.apache.directory.server.core.event.EventType;
+import org.apache.directory.server.core.event.NotificationCriteria;
+import org.apache.directory.shared.ldap.constants.SchemaConstants;
+import org.apache.directory.shared.ldap.cursor.Cursor;
+import org.apache.directory.shared.ldap.entry.DefaultEntry;
+import org.apache.directory.shared.ldap.entry.DefaultEntryAttribute;
+import org.apache.directory.shared.ldap.entry.DefaultModification;
+import org.apache.directory.shared.ldap.entry.Entry;
+import org.apache.directory.shared.ldap.entry.EntryAttribute;
+import org.apache.directory.shared.ldap.entry.Modification;
+import org.apache.directory.shared.ldap.entry.ModificationOperation;
+import org.apache.directory.shared.ldap.filter.SearchScope;
+import org.apache.directory.shared.ldap.message.AliasDerefMode;
+import org.apache.directory.shared.ldap.name.DN;
+import org.apache.directory.shared.ldap.schema.SchemaManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * TODO ReplicaDitStoreUtil.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class ReplicaDitStoreUtil
+{
+    private CoreSession adminSession;
+
+    private SchemaManager schemaManager;
+
+    private static final String REPL_CONSUMER_DN = "ou=consumers,ou=system";
+
+    private static final Logger LOG = LoggerFactory.getLogger( ReplicaDitStoreUtil.class );
+
+    private Map<Integer, List<Modification>> modMap = new HashMap<Integer, List<Modification>>();
+
+    private LdapCoreSessionConnection coreConnection;
+    
+    public ReplicaDitStoreUtil( DirectoryService dirService ) throws Exception
+    {
+        this.adminSession = dirService.getAdminSession();
+        this.schemaManager = dirService.getSchemaManager();
+        coreConnection = new LdapCoreSessionConnection( adminSession );
+        
+        init();
+    }
+
+
+    private void init() throws Exception
+    {
+        DN replConsumerDn = new DN( REPL_CONSUMER_DN );
+
+        if ( !adminSession.exists( replConsumerDn ) )
+        {
+            LOG.debug( "creating the entry for storing replication consumers' details" );
+            Entry entry = new DefaultEntry( schemaManager );
+            entry.setDn( replConsumerDn );
+            entry.add( SchemaConstants.OBJECT_CLASS_AT, SchemaConstants.ORGANIZATIONAL_UNIT_OC );
+            entry.add( SchemaConstants.OU_AT, "consumers" );
+
+            adminSession.add( entry );
+        }
+        
+    }
+
+
+    public void addConsumerEntry( ReplicaEventLog replica ) throws Exception
+    {
+        if ( replica == null )
+        {
+            return;
+        }
+
+        Entry entry = new DefaultEntry( schemaManager );
+        entry.setDn( new DN( "ads-dsReplicaId=" + replica.getId() + "," + REPL_CONSUMER_DN ) );
+        
+        entry.add( SchemaConstants.OBJECT_CLASS_AT, "ads-replConsumer" );
+        entry.add( "ads-dsReplicaId", String.valueOf( replica.getId() ) );
+        entry.add( "ads-replAliasDerefMode", String
+            .valueOf( replica.getSearchCriteria().getAliasDerefMode().getValue() ) );
+        entry.add( "ads-replBase", replica.getSearchCriteria().getBase().getName() );
+        entry.add( "ads-replLastSentCsn", replica.getLastSentCsn() );
+        entry.add( "ads-replSearchScope", String.valueOf( replica.getSearchCriteria().getScope().getScope() ) );
+        entry.add( "ads-replSearchFilter", replica.getSearchFilter() );
+        
+        adminSession.add( entry );
+    }
+
+
+    public void updateReplicaLastSentCsn( ReplicaEventLog replica ) throws Exception
+    {
+     
+        List<Modification> mods = modMap.get( replica.getId() );
+        EntryAttribute lastSentCsnAt = null;
+        if( mods == null )
+        {
+            lastSentCsnAt = new DefaultEntryAttribute( schemaManager.lookupAttributeTypeRegistry( "ads-replLastSentCsn" ) );
+            lastSentCsnAt.add( replica.getLastSentCsn() );
+            
+            Modification mod = new DefaultModification();
+            mod.setOperation( ModificationOperation.REPLACE_ATTRIBUTE );
+            mod.setAttribute( lastSentCsnAt );
+            
+            mods = new ArrayList<Modification>( 1 );
+            mods.add( mod );
+        }
+        else
+        {
+            lastSentCsnAt = mods.get( 0 ).getAttribute();
+            lastSentCsnAt.clear(); // clearing is mandatory
+            lastSentCsnAt.add( replica.getLastSentCsn() );
+        }
+
+        DN dn = new DN( "ads-dsReplicaId=" + replica.getId() + "," + REPL_CONSUMER_DN );
+        adminSession.modify( dn, mods );
+    }
+    
+    
+    public List<ReplicaEventLog> getReplicaConsumers() throws Exception
+    {
+        List<ReplicaEventLog> replicas = new ArrayList<ReplicaEventLog>();
+        
+        Cursor<SearchResponse> cursor = coreConnection.search( REPL_CONSUMER_DN, "(objectClass=ads-replConsumer)", SearchScope.ONELEVEL, "+", "*" );
+        while( cursor.next() )
+        {
+            Entry entry = ( ( SearchResultEntry ) cursor.get() ).getEntry();
+            ReplicaEventLog replica = convertEntryToReplica( entry );
+            replicas.add( replica );
+        }
+        cursor.close();
+        
+        return replicas;
+    }
+
+    
+    private ReplicaEventLog convertEntryToReplica( Entry entry ) throws Exception
+    {
+        String id = entry.get( "ads-dsReplicaId" ).getString();
+        ReplicaEventLog replica = new ReplicaEventLog( Integer.parseInt( id ) );
+        
+        NotificationCriteria searchCriteria = new NotificationCriteria();
+        
+        String aliasMode = entry.get( "ads-replAliasDerefMode" ).getString();
+        searchCriteria.setAliasDerefMode( AliasDerefMode.getDerefMode( Integer.parseInt( aliasMode ) ) );
+        
+        String baseDn = entry.get( "ads-replBase" ).getString();
+        searchCriteria.setBase( baseDn );
+        
+        String lastSentCsn = entry.get( "ads-replLastSentCsn" ).getString();
+        replica.setLastSentCsn( lastSentCsn );
+        
+        String scope = entry.get( "ads-replSearchScope" ).getString();
+        searchCriteria.setScope( SearchScope.getSearchScope( Integer.parseInt( scope ) ) );
+
+        String filter = entry.get( "ads-replSearchFilter" ).getString();
+        searchCriteria.setFilter( filter );
+        replica.setSearchFilter( filter );
+        
+        searchCriteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );
+        replica.setSearchCriteria( searchCriteria );
+        
+        // explicitly mark the replica as not-dirty, cause we just loaded it from 
+        // the store, this prevents updating the replica info immediately after loading
+        replica.setDirty( false );
+        
+        return replica;
+    }
+}

Added: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java?rev=947278&view=auto
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java (added)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java Sat May 22 10:26:50 2010
@@ -0,0 +1,348 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+
+package org.apache.directory.server.ldap.replication;
+
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.directory.server.core.event.EventType;
+import org.apache.directory.server.core.event.NotificationCriteria;
+import org.apache.directory.shared.ldap.entry.Entry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A message log used for storing the changes done on DIT on a syncrepl consumer's search base
+ * A separate log is maintained for each syncrepl consumer  
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class ReplicaEventLog
+{
+
+    /** IP address of the syncrepl consumer */
+    private String hostName;
+
+    /** the unmodified search filter as it was when received from the client */
+    private String searchFilter;
+
+    /** the csn that was sent to the client during the last sync session*/
+    private String lastSentCsn;
+
+    /** the persistent listener */
+    private SyncReplSearchListener persistentListener;
+
+    /** notification criteria used by the persistent sea*/
+    private NotificationCriteria searchCriteria;
+
+    /** the replica id */
+    private int replicaId;
+
+    /** flag indicating refreshAndPersist mode */
+    private boolean refreshNPersist;
+
+    // fields that won't be serialized
+
+    /** the ActiveMQ session */
+    private ActiveMQSession amqSession;
+
+    /** the Queue used for storing messages */
+    private ActiveMQQueue queue;
+
+    /** message producer for Queue */
+    private ActiveMQMessageProducer producer;
+
+    /** the messaging system's connection */
+    private ActiveMQConnection amqConnection;
+
+    /** ActiveMQ's BrokerService */
+    private BrokerService brokerService;
+
+    private volatile boolean dirty;
+
+    private static final Logger LOG = LoggerFactory.getLogger( ReplicaEventLog.class );
+
+
+    public ReplicaEventLog()
+    {
+
+    }
+
+
+    public ReplicaEventLog( int replicaId )
+    {
+        this.replicaId = replicaId;
+        this.searchCriteria = new NotificationCriteria();
+        this.searchCriteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );
+    }
+
+
+    /**
+     * instantiates a message queue and corresponding producer for storing DIT changes  
+     *
+     * @param amqConnection ActiveMQ connection
+     * @param brokerService ActiveMQ's broker service
+     * @throws Exception
+     */
+    public void configure( final ActiveMQConnection amqConnection, final BrokerService brokerService ) throws Exception
+    {
+        if ( amqSession == null || !amqSession.isRunning() )
+        {
+            this.amqConnection = amqConnection;
+            amqSession = ( ActiveMQSession ) amqConnection.createSession( false, ActiveMQSession.AUTO_ACKNOWLEDGE );
+            queue = ( ActiveMQQueue ) amqSession.createQueue( getQueueName() );
+            producer = ( ActiveMQMessageProducer ) amqSession.createProducer( queue );
+            this.brokerService = brokerService;
+        }
+    }
+
+
+    /**
+     * stores the given EventType and Entry in the queue 
+     *
+     * @param event the EventType
+     * @param entry the modified Entry
+     */
+    public void log( EventType event, Entry entry )
+    {
+        LOG.debug( "logging entry with DN {} with the event {}", entry.getDn(), event );
+        log( new ReplicaEventMessage( event, entry ) );
+    }
+
+
+    public void log( ReplicaEventMessage message )
+    {
+        try
+        {
+            ActiveMQObjectMessage ObjectMessage = ( ActiveMQObjectMessage ) amqSession.createObjectMessage();
+            ObjectMessage.setObject( message );
+            producer.send( ObjectMessage );
+        }
+        catch ( Exception e )
+        {
+            LOG.warn( "Failed to insert the entry into syncrepl log", e );
+        }
+
+    }
+
+
+    /**
+     * deletes the queue (to remove the log) and recreates a new queue instance
+     * with the same queue name. Also creates the corresponding message producer
+     *
+     * @throws Exception
+     */
+    public void truncate() throws Exception
+    {
+        producer.close();
+
+        String queueName = queue.getQueueName();
+        LOG.debug( "deleting the queue {}", queueName );
+        amqConnection.destroyDestination( queue );
+        queue = null;
+    }
+
+
+    public void recreate() throws Exception
+    {
+        LOG.debug( "recreating the queue for the replica id {}", replicaId );
+        queue = ( ActiveMQQueue ) amqSession.createQueue( getQueueName() );
+        producer = ( ActiveMQMessageProducer ) amqSession.createProducer( queue );
+    }
+
+
+    public void destroy() throws Exception
+    {
+        // first truncate
+        truncate();
+
+        // then close the producer and session, DO NOT close connection 
+        producer.close();
+        amqSession.close();
+    }
+
+
+    @Override
+    public boolean equals( Object obj )
+    {
+        if ( !( obj instanceof ReplicaEventLog ) )
+        {
+            return false;
+        }
+
+        ReplicaEventLog other = ( ReplicaEventLog ) obj;
+
+        if ( replicaId != other.getId() )
+        {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode()
+    {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + searchFilter.hashCode();
+        result = prime * result + hostName.hashCode();
+        return result;
+    }
+
+
+    public int compareTo( ReplicaEventLog o )
+    {
+        if ( this.equals( o ) )
+        {
+            return 0;
+        }
+
+        return 1;
+    }
+
+
+    public SyncReplSearchListener getPersistentListener()
+    {
+        return persistentListener;
+    }
+
+
+    public void setPersistentListener( SyncReplSearchListener persistentListener )
+    {
+        this.persistentListener = persistentListener;
+    }
+
+
+    public NotificationCriteria getSearchCriteria()
+    {
+        return searchCriteria;
+    }
+
+
+    public void setSearchCriteria( NotificationCriteria searchCriteria )
+    {
+        this.searchCriteria = searchCriteria;
+    }
+
+
+    public boolean isRefreshNPersist()
+    {
+        return refreshNPersist;
+    }
+
+
+    public void setRefreshNPersist( boolean refreshNPersist )
+    {
+        this.refreshNPersist = refreshNPersist;
+    }
+
+
+    public int getId()
+    {
+        return replicaId;
+    }
+
+
+    public String getLastSentCsn()
+    {
+        return lastSentCsn;
+    }
+
+
+    public void setLastSentCsn( String lastSentCsn )
+    {
+        // set only if there is a change in cookie value
+        // this will avoid setting the dirty flag which eventually is used for
+        // storing the details of this log
+        if ( !lastSentCsn.equals( this.lastSentCsn ) )
+        {
+            this.lastSentCsn = lastSentCsn;
+            dirty = true;
+        }
+    }
+
+
+    public String getHostName()
+    {
+        return hostName;
+    }
+
+
+    public void setHostName( String hostName )
+    {
+        this.hostName = hostName;
+    }
+
+
+    public String getSearchFilter()
+    {
+        return searchFilter;
+    }
+
+
+    public void setSearchFilter( String searchFilter )
+    {
+        this.searchFilter = searchFilter;
+    }
+
+
+    public boolean isDirty()
+    {
+        return dirty;
+    }
+
+
+    public void setDirty( boolean dirty )
+    {
+        this.dirty = dirty;
+    }
+
+
+    public String getQueueName()
+    {
+        return "replicaId=" + replicaId;
+    }
+
+
+    public ReplicaEventLogCursor getCursor() throws Exception
+    {
+        Queue regionQueue = ( Queue ) brokerService.getRegionBroker().getDestinationMap().get( queue );
+        return new ReplicaEventLogCursor( amqSession, queue, regionQueue );
+    }
+
+
+    @Override
+    public String toString()
+    {
+        return "ClientMessageQueueLog [ipAddress=" + hostName + ", filter=" + searchFilter + ", replicaId=" + replicaId
+            + ", lastSentCookie=" + lastSentCsn + "]";
+    }
+
+}

Added: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLogCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLogCursor.java?rev=947278&view=auto
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLogCursor.java (added)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLogCursor.java Sat May 22 10:26:50 2010
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.directory.server.ldap.replication;
+
+
+import java.util.Iterator;
+
+import org.apache.activemq.ActiveMQQueueBrowser;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.directory.shared.ldap.cursor.AbstractCursor;
+
+
+/**
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+class ReplicaEventLogCursor extends AbstractCursor<ReplicaEventMessage>
+{
+
+    private ActiveMQQueueBrowser browser;
+
+    private Queue regionQueue;
+
+
+    public ReplicaEventLogCursor( ActiveMQSession session, ActiveMQQueue queue, Queue regionQueue ) throws Exception
+    {
+        // commit before starting browser, to see the latest view of the Queue data
+//        session.commit();
+        
+        browser = ( ActiveMQQueueBrowser ) session.createBrowser( queue );
+        
+        this.regionQueue = regionQueue;
+    }
+
+
+    public void after( ReplicaEventMessage arg0 ) throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
+
+
+    public void afterLast() throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
+
+
+    public boolean available()
+    {
+        return browser.hasMoreElements();
+    }
+
+
+    public void before( ReplicaEventMessage arg0 ) throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
+
+
+    public void beforeFirst() throws Exception
+    {
+    }
+
+
+    public boolean first() throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
+
+
+    public ReplicaEventMessage get() throws Exception
+    {
+        ActiveMQObjectMessage amqObj = ( ActiveMQObjectMessage ) browser.nextElement();
+        System.out.println( "========================================= " + amqObj );
+        ReplicaEventMessage message = ( ReplicaEventMessage ) amqObj.getObject();
+        regionQueue.removeMessage( amqObj.getJMSMessageID() );
+        
+        return message;
+    }
+
+
+    public boolean isElementReused()
+    {
+        return false;
+    }
+
+
+    public boolean last() throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
+
+
+    public boolean next() throws Exception
+    {
+        return browser.hasMoreElements();
+    }
+
+
+    public boolean previous() throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
+
+
+    @Override
+    public void close() throws Exception
+    {
+        browser.close();
+        super.close();
+    }
+
+
+    @Override
+    public void close( Exception cause ) throws Exception
+    {
+        browser.close();
+        super.close( cause );
+    }
+
+
+    @Override
+    public Iterator<ReplicaEventMessage> iterator()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+}

Added: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java?rev=947278&view=auto
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java (added)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java Sat May 22 10:26:50 2010
@@ -0,0 +1,237 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+
+package org.apache.directory.server.ldap.replication;
+
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Iterator;
+
+import org.apache.directory.server.core.event.EventType;
+import org.apache.directory.shared.i18n.I18n;
+import org.apache.directory.shared.ldap.codec.controls.replication.syncmodifydn.SyncModifyDnControl;
+import org.apache.directory.shared.ldap.entry.DefaultEntry;
+import org.apache.directory.shared.ldap.entry.DefaultEntryAttribute;
+import org.apache.directory.shared.ldap.entry.Entry;
+import org.apache.directory.shared.ldap.entry.EntryAttribute;
+import org.apache.directory.shared.ldap.message.control.replication.SyncModifyDnType;
+import org.apache.directory.shared.ldap.name.DN;
+import org.apache.directory.shared.ldap.name.DnSerializer;
+import org.apache.directory.shared.ldap.schema.AttributeType;
+import org.apache.directory.shared.ldap.schema.SchemaManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * TODO ReplicaEventMessage.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class ReplicaEventMessage implements Externalizable
+{
+    private EventType eventType;
+    private Entry entry;
+
+    private SyncModifyDnControl modDnControl;
+    
+    private static final Logger LOG = LoggerFactory.getLogger( ReplicaEventMessage.class );
+
+    private static SchemaManager schemaManager;
+
+
+    public ReplicaEventMessage()
+    {
+        // used by deserializer
+    }
+
+
+    public ReplicaEventMessage( EventType eventType, Entry entry )
+    {
+        this.eventType = eventType;
+        this.entry = entry;
+    }
+
+
+    public ReplicaEventMessage( SyncModifyDnControl modDnControl, Entry entry )
+    {
+        this.modDnControl = modDnControl;
+        this.entry = entry;
+    }
+
+    
+    public EventType getEventType()
+    {
+        return eventType;
+    }
+
+
+    public Entry getEntry()
+    {
+        return entry;
+    }
+
+
+    public SyncModifyDnControl getModDnControl()
+    {
+        return modDnControl;
+    }
+
+    
+    public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException
+    {
+
+        byte b = in.readByte();
+        if( b == 0 ) // handle the SyncModDnControl
+        {
+            SyncModifyDnType modDnType = SyncModifyDnType.getModifyDnType( in.readShort() );
+            
+            modDnControl = new SyncModifyDnControl();
+            modDnControl.setModDnType( modDnType );
+            
+            modDnControl.setEntryDn( in.readUTF() );
+            
+            switch( modDnType )
+            {
+                case MOVE:
+                    modDnControl.setNewSuperiorDn( in.readUTF() );
+                    break;
+                   
+                case RENAME:
+                    modDnControl.setNewRdn( in.readUTF() );
+                    modDnControl.setDeleteOldRdn( in.readBoolean() );
+                    break;
+                    
+                case MOVEANDRENAME:
+                    modDnControl.setNewSuperiorDn( in.readUTF() );
+                    modDnControl.setNewRdn( in.readUTF() );
+                    modDnControl.setDeleteOldRdn( in.readBoolean() );
+            }
+        }
+        else // read the event type
+        {
+            eventType = EventType.getType( in.readShort() );
+        }
+
+        // initialize the entry
+        entry = new DefaultEntry( schemaManager );
+
+        // Read the DN
+        DN dn = DnSerializer.deserialize( in );
+        entry.setDn( dn );
+
+        // Read the number of attributes
+        int nbAttributes = in.readInt();
+
+        // Read the attributes
+        for ( int i = 0; i < nbAttributes; i++ )
+        {
+            // Read the attribute's OID
+            String oid = in.readUTF();
+
+            try
+            {
+                AttributeType attributeType = schemaManager.lookupAttributeTypeRegistry( oid );
+
+                // Create the attribute we will read
+                DefaultEntryAttribute attribute = new DefaultEntryAttribute( attributeType );
+
+                // Read the attribute
+                attribute.deserialize( in );
+
+                entry.add( attribute );
+            }
+            catch ( Exception ne )
+            {
+                entry = null;
+                // We weren't able to find the OID. The attribute will not be added
+                LOG.warn( I18n.err( I18n.ERR_04470, oid ) );
+            }
+        }
+    }
+
+
+    public void writeExternal( ObjectOutput out ) throws IOException
+    {
+        if( eventType == null )
+        {
+            out.writeByte( 0 );
+            
+            SyncModifyDnType modDnType = modDnControl.getModDnType();
+            out.writeShort( modDnType.getValue() );
+            out.writeUTF( modDnControl.getEntryDn() );
+            
+            switch( modDnType )
+            {
+                case MOVE:
+                    out.writeUTF( modDnControl.getNewSuperiorDn() );
+                    break;
+                   
+                case RENAME:
+                    out.writeUTF( modDnControl.getNewRdn() );
+                    out.writeBoolean( modDnControl.isDeleteOldRdn() );
+                    break;
+                    
+                case MOVEANDRENAME:
+                    out.writeUTF( modDnControl.getNewSuperiorDn() );
+                    out.writeUTF( modDnControl.getNewRdn() );
+                    out.writeBoolean( modDnControl.isDeleteOldRdn() );
+            }
+        }
+        else
+        {
+            out.writeByte( 1 );
+            out.writeShort( eventType.getMask() );
+        }
+
+        // then DN
+        DnSerializer.serialize( entry.getDn(), out );
+
+        // Then the attributes.
+        out.writeInt( entry.size() );
+
+        // Iterate through the keys. We store the Attribute
+        // here, to be able to restore it in the readExternal :
+        // we need access to the registries, which are not available
+        // in the ServerAttribute class.
+        Iterator<EntryAttribute> attrItr = entry.iterator();
+        while ( attrItr.hasNext() )
+        {
+            DefaultEntryAttribute attribute = ( DefaultEntryAttribute ) attrItr.next();
+            // Write the oid to be able to restore the AttributeType when deserializing
+            // the attribute
+            out.writeUTF( attribute.getAttributeType().getOid() );
+
+            // Write the attribute
+            attribute.serialize( out );
+        }
+    }
+
+
+    public static void setSchemaManager( SchemaManager schemaManager )
+    {
+        ReplicaEventMessage.schemaManager = schemaManager;
+    }
+
+}

Added: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplProvider.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplProvider.java?rev=947278&view=auto
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplProvider.java (added)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplProvider.java Sat May 22 10:26:50 2010
@@ -0,0 +1,999 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+
+package org.apache.directory.server.ldap.replication;
+
+
+import static java.lang.Math.min;
+import static org.apache.directory.server.ldap.LdapServer.NO_SIZE_LIMIT;
+import static org.apache.directory.server.ldap.LdapServer.NO_TIME_LIMIT;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.directory.server.core.DirectoryService;
+import org.apache.directory.server.core.entry.ClonedServerEntry;
+import org.apache.directory.server.core.event.EventType;
+import org.apache.directory.server.core.event.NotificationCriteria;
+import org.apache.directory.server.core.filtering.EntryFilteringCursor;
+import org.apache.directory.server.i18n.I18n;
+import org.apache.directory.server.ldap.LdapServer;
+import org.apache.directory.server.ldap.LdapSession;
+import org.apache.directory.server.ldap.handlers.SearchAbandonListener;
+import org.apache.directory.server.ldap.handlers.SearchTimeLimitingMonitor;
+import org.apache.directory.shared.ldap.codec.controls.ManageDsaITControl;
+import org.apache.directory.shared.ldap.codec.controls.replication.syncDoneValue.SyncDoneValueControl;
+import org.apache.directory.shared.ldap.codec.controls.replication.syncInfoValue.SyncInfoValueControl;
+import org.apache.directory.shared.ldap.codec.controls.replication.syncRequestValue.SyncRequestValueControl;
+import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueControl;
+import org.apache.directory.shared.ldap.codec.controls.replication.syncmodifydn.SyncModifyDnControl;
+import org.apache.directory.shared.ldap.codec.util.LdapURLEncodingException;
+import org.apache.directory.shared.ldap.constants.SchemaConstants;
+import org.apache.directory.shared.ldap.csn.Csn;
+import org.apache.directory.shared.ldap.entry.Entry;
+import org.apache.directory.shared.ldap.entry.EntryAttribute;
+import org.apache.directory.shared.ldap.entry.StringValue;
+import org.apache.directory.shared.ldap.entry.Value;
+import org.apache.directory.shared.ldap.exception.LdapException;
+import org.apache.directory.shared.ldap.filter.AndNode;
+import org.apache.directory.shared.ldap.filter.EqualityNode;
+import org.apache.directory.shared.ldap.filter.ExprNode;
+import org.apache.directory.shared.ldap.filter.GreaterEqNode;
+import org.apache.directory.shared.ldap.filter.LessEqNode;
+import org.apache.directory.shared.ldap.filter.OrNode;
+import org.apache.directory.shared.ldap.filter.PresenceNode;
+import org.apache.directory.shared.ldap.filter.SearchScope;
+import org.apache.directory.shared.ldap.message.IntermediateResponseImpl;
+import org.apache.directory.shared.ldap.message.ReferralImpl;
+import org.apache.directory.shared.ldap.message.ResultCodeEnum;
+import org.apache.directory.shared.ldap.message.SearchResponseEntryImpl;
+import org.apache.directory.shared.ldap.message.SearchResponseReferenceImpl;
+import org.apache.directory.shared.ldap.message.control.replication.SyncStateTypeEnum;
+import org.apache.directory.shared.ldap.message.control.replication.SynchronizationInfoEnum;
+import org.apache.directory.shared.ldap.message.control.replication.SynchronizationModeEnum;
+import org.apache.directory.shared.ldap.message.internal.InternalIntermediateResponse;
+import org.apache.directory.shared.ldap.message.internal.InternalLdapResult;
+import org.apache.directory.shared.ldap.message.internal.InternalResponse;
+import org.apache.directory.shared.ldap.message.internal.InternalSearchRequest;
+import org.apache.directory.shared.ldap.message.internal.InternalSearchResponseDone;
+import org.apache.directory.shared.ldap.message.internal.InternalSearchResponseEntry;
+import org.apache.directory.shared.ldap.message.internal.InternalSearchResponseReference;
+import org.apache.directory.shared.ldap.schema.AttributeType;
+import org.apache.directory.shared.ldap.util.LdapURL;
+import org.apache.directory.shared.ldap.util.StringTools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * NOTE: doco is missing at many parts. Will be added once the functionality is satisfactory
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+@SuppressWarnings("unchecked")
+public class SyncReplProvider implements ReplicationProvider
+{
+
+    public static final String REPLICA_ID_DELIM = ";";
+
+    private static final Logger LOG = LoggerFactory.getLogger( SyncReplProvider.class );
+
+    private boolean initialized = false;
+
+    private DirectoryService dirService;
+
+    /** The reference on the Ldap server instance */
+    protected LdapServer ldapServer;
+
+    private AttributeType objectClassAttributeType;
+
+    private Map<Integer, ReplicaEventLog> replicaLogMap = new HashMap<Integer, ReplicaEventLog>();
+
+    private BrokerService brokerService;
+    
+    private ActiveMQConnection amqConnection;
+
+    private File syncReplData;
+
+    private AtomicInteger replicaCount = new AtomicInteger(0);
+
+    private ReplicaDitStoreUtil replicaUtil;
+    
+    public SyncReplProvider()
+    {
+    }
+
+
+    public void init( LdapServer server )
+    {
+        if ( initialized )
+        {
+            LOG.warn( "syncrepl provider was already initialized" );
+            return;
+        }
+        try
+        {
+            LOG.info( "initializing the syncrepl provider" );
+
+            this.ldapServer = server;
+            this.dirService = server.getDirectoryService();
+
+            File workDir = dirService.getWorkingDirectory();
+            syncReplData = new File( workDir, "syncrepl-data" );
+            if ( !syncReplData.exists() )
+            {
+                syncReplData.mkdirs();
+            }
+
+            String path = syncReplData.getPath();
+
+            brokerService = new BrokerService();
+            brokerService.setUseJmx( false );
+            brokerService.setPersistent( true );
+            brokerService.setDataDirectory( path );
+
+            URI vmConnectorUri = new URI( "vm://localhost" );
+            brokerService.setVmConnectorURI( vmConnectorUri );
+            
+            brokerService.start();
+            ActiveMQConnectionFactory amqFactory = new ActiveMQConnectionFactory( vmConnectorUri.toString() );
+            amqFactory.setObjectMessageSerializationDefered( false );
+
+            amqConnection = ( ActiveMQConnection ) amqFactory.createConnection();
+            amqConnection.start();
+
+            // set the static reference to SchemaManager
+            ReplicaEventMessage.setSchemaManager( dirService.getSchemaManager() );
+
+            replicaUtil = new ReplicaDitStoreUtil( dirService );
+            
+            loadReplicaInfo();
+            
+            registerPersistentSearches();
+            
+            Thread consumerInfoUpdateThread = new Thread( createConsumerInfoUpdateTask() );
+            consumerInfoUpdateThread.setDaemon( true );
+            consumerInfoUpdateThread.start();
+            
+            initialized = true;
+            LOG.info( "syncrepl provider initialized successfully" );
+        }
+        catch ( Exception e )
+        {
+            LOG.error( "Failed to initialize the log files required by the syncrepl provider", e );
+            throw new RuntimeException( e );
+        }
+    }
+
+
+    public void stop()
+    {
+        try
+        {
+            brokerService.stop();
+            amqConnection.close();
+        }
+        catch ( Exception e )
+        {
+            LOG.warn( "Failed to close the message queue connection", e );
+        }
+
+        initialized = false;
+    }
+
+
+    public void handleSyncRequest( LdapSession session, InternalSearchRequest req ) throws LdapException
+    {
+        try
+        {
+            SyncRequestValueControl syncControl = ( SyncRequestValueControl ) req.getControls().get(
+                SyncRequestValueControl.CONTROL_OID );
+            
+            // cookie is in the format <replicaId>;<Csn value>
+            byte[] cookieBytes = syncControl.getCookie();
+            String cookieString = StringTools.utf8ToString( cookieBytes );
+            
+            if ( cookieBytes == null )
+            {
+                doInitialRefresh( session, req );
+            }
+            else
+            {
+                LOG.warn( "search request received with the cookie {}", StringTools.utf8ToString( cookieBytes ) );
+                if ( !isValidCookie( cookieString ) )
+                {
+                    LOG.error( "received a invalid cookie {} from the consumer with session {}", cookieString, session );
+                    sendESyncRefreshRequired( session, req );
+                }
+                else
+                {
+                    ReplicaEventLog clientMsgLog = getReplicaEventLog( cookieString );
+                    if( clientMsgLog == null )
+                    {
+                        LOG.warn( "received a valid cookie {} but there is no event log associated with this replica", cookieString );
+                        sendESyncRefreshRequired( session, req );
+                    }
+                    else
+                    {
+                        doContentUpdate( session, req, clientMsgLog );
+                    }
+                }
+            }
+        }
+        catch ( Exception e )
+        {
+            LOG.error( "Failed to handle the syncrepl request", e );
+
+            LdapException le = new LdapException( e.getMessage() );
+            le.initCause( e );
+
+            throw le;
+        }
+    }
+
+    
+    private String sendContentFromLog( LdapSession session, InternalSearchRequest req, ReplicaEventLog clientMsgLog ) throws Exception
+    {
+        // do the search from the log
+        String lastSentCsn = clientMsgLog.getLastSentCsn();
+
+        ReplicaEventLogCursor cursor = clientMsgLog.getCursor();
+        while ( cursor.next() )
+        {
+            ReplicaEventMessage message = cursor.get();
+            Entry entry = message.getEntry();
+            LOG.debug( "received message from the queue {}", entry );
+            
+            lastSentCsn = entry.get( SchemaConstants.ENTRY_CSN_AT ).getString();
+            
+            EventType event = message.getEventType();
+            
+            // if event type is null, then it is a MODDN operation
+            if( event == null )
+            {
+                sendSearchResultEntry( session, req, entry, message.getModDnControl() );
+            }
+            else
+            {
+                SyncStateTypeEnum syncStateType = null;
+                if( event == EventType.ADD || event == EventType.MODIFY )
+                {
+                    syncStateType = SyncStateTypeEnum.ADD;
+                }
+                else if( event == EventType.DELETE )
+                {
+                    syncStateType = SyncStateTypeEnum.DELETE;
+                }
+                
+                sendSearchResultEntry( session, req, entry, syncStateType );
+            }
+        }
+        cursor.close();
+
+        return lastSentCsn;
+    }
+
+    
+    private void doContentUpdate( LdapSession session, InternalSearchRequest req, ReplicaEventLog replicaLog )
+        throws Exception
+    {
+        boolean refreshNPersist = isRefreshNPersist( req );
+        
+        // if this method is called with refreshAndPersist  
+        // means the client was offline after it initiated a persistent synch session
+        // we need to update the handler's session 
+        if( refreshNPersist )
+        {
+            SyncReplSearchListener handler = replicaLog.getPersistentListener();
+            handler.setReq( req );
+            handler.setSession( session );
+        }
+        
+        String lastSentCsn = sendContentFromLog( session, req, replicaLog );
+        
+        byte[] cookie = StringTools.getBytesUtf8( replicaLog.getId() + REPLICA_ID_DELIM + lastSentCsn ) ;
+        
+        if( refreshNPersist )
+        {
+            InternalIntermediateResponse intermResp = new IntermediateResponseImpl( req.getMessageId() );
+            intermResp.setResponseName( SyncInfoValueControl.CONTROL_OID );
+            
+            SyncInfoValueControl syncInfo = new SyncInfoValueControl( SynchronizationInfoEnum.NEW_COOKIE );
+            syncInfo.setCookie( cookie );
+            intermResp.setResponseValue( syncInfo.getValue() );
+            
+            session.getIoSession().write( intermResp );
+            
+            replicaLog.getPersistentListener().setPushInRealTime( refreshNPersist );
+        }
+        else
+        {
+            InternalSearchResponseDone searchDoneResp = ( InternalSearchResponseDone ) req.getResultResponse();
+            searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.SUCCESS );
+            SyncDoneValueControl syncDone = new SyncDoneValueControl();
+            syncDone.setCookie( cookie );
+            searchDoneResp.add( syncDone );
+            
+            session.getIoSession().write( searchDoneResp );
+        }
+        
+        replicaLog.setLastSentCsn( lastSentCsn );
+    }
+
+
+    private void doInitialRefresh( LdapSession session, InternalSearchRequest req )
+        throws Exception
+    {
+
+        String originalFilter = req.getFilter().toString();
+        InetSocketAddress address = ( InetSocketAddress )session.getIoSession().getRemoteAddress();
+        String hostName = address.getAddress().getHostName();
+
+        ExprNode modifiedFilter = modifyFilter( session, req );
+
+        String contextCsn = dirService.getContextCsn();
+        
+        boolean refreshNPersist = isRefreshNPersist( req );
+
+        // first register a persistent search handler before starting the initial content refresh
+        // this is to log all the operations happen on DIT during initial content refresh
+        
+        ReplicaEventLog replicaLog = createRelicaEventLog( hostName, originalFilter );
+
+        replicaLog.setRefreshNPersist( refreshNPersist );
+
+        // modify the filter to include the context Csn
+        GreaterEqNode csnGeNode = new GreaterEqNode( SchemaConstants.ENTRY_CSN_AT, new StringValue( contextCsn ) );
+        ExprNode postInitContentFilter = new AndNode( modifiedFilter, csnGeNode );
+        req.setFilter( postInitContentFilter );
+        
+        // now we process entries forever as they change
+        LOG.info( "starting persistent search for the client {}", replicaLog );
+        
+        // irrespective of the sync mode set the 'isRealtimePush' to false initially so that we can
+        // store the modifications in the queue and later if it is a persist mode
+        // we push this queue's content and switch to realtime mode
+        SyncReplSearchListener handler = new SyncReplSearchListener( session, req, replicaLog, false );
+        replicaLog.setPersistentListener( handler );
+
+        // compose notification criteria and add the listener to the event 
+        // service using that notification criteria to determine which events 
+        // are to be delivered to the persistent search issuing client
+        NotificationCriteria criteria = new NotificationCriteria();
+        criteria.setAliasDerefMode( req.getDerefAliases() );
+        criteria.setBase( req.getBase() );
+        criteria.setFilter( req.getFilter() );
+        criteria.setScope( req.getScope() );
+        criteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );
+        
+        replicaLog.setSearchCriteria( criteria );
+        
+        dirService.getEventService().addListener( handler, criteria );
+
+        
+        // then start pushing initial content
+        LessEqNode csnNode = new LessEqNode( SchemaConstants.ENTRY_CSN_AT, new StringValue( contextCsn ) );
+
+        // modify the filter to include the context Csn
+        ExprNode initialContentFilter = new AndNode( modifiedFilter, csnNode );
+        req.setFilter( initialContentFilter );
+
+        InternalSearchResponseDone searchDoneResp = doSimpleSearch( session, req );
+        
+        if ( searchDoneResp.getLdapResult().getResultCode() == ResultCodeEnum.SUCCESS )
+        {
+            replicaLog.setLastSentCsn( contextCsn );
+            byte[] cookie = StringTools.getBytesUtf8( replicaLog.getId() + REPLICA_ID_DELIM + contextCsn ) ;
+            
+            if( refreshNPersist ) // refreshAndPersist mode
+            {
+                contextCsn = sendContentFromLog( session, req, replicaLog );
+                cookie = StringTools.getBytesUtf8( replicaLog.getId() + REPLICA_ID_DELIM + contextCsn ) ;
+                
+                InternalIntermediateResponse intermResp = new IntermediateResponseImpl( req.getMessageId() );
+                intermResp.setResponseName( SyncInfoValueControl.CONTROL_OID );
+                
+                SyncInfoValueControl syncInfo = new SyncInfoValueControl( SynchronizationInfoEnum.NEW_COOKIE );
+                syncInfo.setCookie( cookie );
+                intermResp.setResponseValue( syncInfo.getValue() );
+                
+                session.getIoSession().write( intermResp );
+                
+                // switch the handler mode to realtime push
+                handler.setPushInRealTime( refreshNPersist );
+            }
+            else
+            {
+                // no need to send from the log, that will be done in the next refreshOnly session
+                SyncDoneValueControl syncDone = new SyncDoneValueControl();
+                syncDone.setCookie( cookie );
+                searchDoneResp.add( syncDone );
+                session.getIoSession().write( searchDoneResp );
+            }
+        }
+        else // if not succeeded return
+        {
+            LOG.warn( "initial content refresh didn't succeed due to {}", searchDoneResp.getLdapResult().getResultCode() );
+            replicaLog.truncate();
+            replicaLog = null;
+            
+            // remove the listener
+            dirService.getEventService().removeListener( handler );
+            
+            return;
+        }
+        
+        // if all is well then store the consumer infor
+        replicaUtil.addConsumerEntry( replicaLog );
+        
+        // add to the map only after storing in the DIT, else the Replica update thread barfs
+        replicaLogMap.put( replicaLog.getId(), replicaLog );
+    }
+
+
+    private InternalSearchResponseDone doSimpleSearch( LdapSession session, InternalSearchRequest req )
+        throws Exception
+    {
+        InternalSearchResponseDone searchDoneResp = ( InternalSearchResponseDone ) req.getResultResponse();
+        InternalLdapResult ldapResult = searchDoneResp.getLdapResult();
+
+        // A normal search
+        // Check that we have a cursor or not. 
+        // No cursor : do a search.
+        EntryFilteringCursor cursor = session.getCoreSession().search( req );
+
+        // Position the cursor at the beginning
+        cursor.beforeFirst();
+
+        /*
+         * Iterate through all search results building and sending back responses
+         * for each search result returned.
+         */
+        try
+        {
+            // Get the size limits
+            // Don't bother setting size limits for administrators that don't ask for it
+            long serverLimit = getServerSizeLimit( session, req );
+
+            long requestLimit = req.getSizeLimit() == 0L ? Long.MAX_VALUE : req.getSizeLimit();
+
+            req.addAbandonListener( new SearchAbandonListener( ldapServer, cursor ) );
+            setTimeLimitsOnCursor( req, session, cursor );
+            LOG.debug( "using <{},{}> for size limit", requestLimit, serverLimit );
+            long sizeLimit = min( requestLimit, serverLimit );
+
+            readResults( session, req, ldapResult, cursor, sizeLimit );
+        }
+        finally
+        {
+            if ( cursor != null )
+            {
+                try
+                {
+                    cursor.close();
+                }
+                catch ( Exception e )
+                {
+                    LOG.error( I18n.err( I18n.ERR_168 ), e );
+                }
+            }
+        }
+
+        return searchDoneResp;
+    }
+
+
+    private void readResults( LdapSession session, InternalSearchRequest req, InternalLdapResult ldapResult,
+        EntryFilteringCursor cursor, long sizeLimit ) throws Exception
+    {
+        long count = 0;
+
+        while ( ( count < sizeLimit ) && cursor.next() )
+        {
+            // Handle closed session
+            if ( session.getIoSession().isClosing() )
+            {
+                // The client has closed the connection
+                LOG.debug( "Request terminated for message {}, the client has closed the session", req.getMessageId() );
+                break;
+            }
+
+            if ( req.isAbandoned() )
+            {
+                // The cursor has been closed by an abandon request.
+                LOG.debug( "Request terminated by an AbandonRequest for message {}", req.getMessageId() );
+                break;
+            }
+
+            ClonedServerEntry entry = cursor.get();
+
+            sendSearchResultEntry( session, req, entry, SyncStateTypeEnum.ADD );
+
+            count++;
+        }
+
+        // DO NOT WRITE THE RESPONSE - JUST RETURN IT
+        ldapResult.setResultCode( ResultCodeEnum.SUCCESS );
+
+        if ( ( count >= sizeLimit ) && ( cursor.next() ) )
+        {
+            // We have reached the limit
+            // Move backward on the cursor to restore the previous position, as we moved forward
+            // to check if there is one more entry available
+            cursor.previous();
+            // Special case if the user has requested more elements than the request size limit
+            ldapResult.setResultCode( ResultCodeEnum.SIZE_LIMIT_EXCEEDED );
+        }
+    }
+
+
+    private void sendSearchResultEntry( LdapSession session, InternalSearchRequest req, Entry entry,
+        SyncStateTypeEnum syncStateType ) throws Exception
+    {
+
+        EntryAttribute uuid = entry.get( SchemaConstants.ENTRY_UUID_AT );
+        SyncStateValueControl syncStateControl = new SyncStateValueControl();
+        syncStateControl.setSyncStateType( syncStateType );
+        syncStateControl.setEntryUUID( StringTools.uuidToBytes( uuid.getString() ) );
+
+        if( syncStateType == SyncStateTypeEnum.DELETE )
+        {
+            // clear the entry's all attributes except the DN and entryUUID
+            entry.clear();
+            entry.add( uuid );
+        }
+        
+        InternalResponse resp = generateResponse( session, req, entry );
+        resp.add( syncStateControl );
+
+        session.getIoSession().write( resp );
+        LOG.debug( "Sending {}", entry.getDn() );
+    }
+
+
+    private void sendSearchResultEntry( LdapSession session, InternalSearchRequest req, Entry entry,
+        SyncModifyDnControl modDnControl ) throws Exception
+    {
+
+        EntryAttribute uuid = entry.get( SchemaConstants.ENTRY_UUID_AT );
+        SyncStateValueControl syncStateControl = new SyncStateValueControl();
+        syncStateControl.setSyncStateType( SyncStateTypeEnum.MODDN );
+        syncStateControl.setEntryUUID( StringTools.uuidToBytes( uuid.getString() ) );
+
+        InternalResponse resp = generateResponse( session, req, entry );
+        resp.add( syncStateControl );
+        resp.add( modDnControl );
+        
+        session.getIoSession().write( resp );
+        LOG.debug( "Sending {}", entry.getDn() );
+    }
+
+    
+    private InternalResponse generateResponse( LdapSession session, InternalSearchRequest req, Entry entry )
+        throws Exception
+    {
+        EntryAttribute ref = entry.get( SchemaConstants.REF_AT );
+        boolean hasManageDsaItControl = req.getControls().containsKey( ManageDsaITControl.CONTROL_OID );
+
+        if ( ( ref != null ) && !hasManageDsaItControl )
+        {
+            // The entry is a referral.
+            InternalSearchResponseReference respRef;
+            respRef = new SearchResponseReferenceImpl( req.getMessageId() );
+            respRef.setReferral( new ReferralImpl() );
+
+            for ( Value<?> val : ref )
+            {
+                String url = val.getString();
+
+                if ( !url.startsWith( "ldap" ) )
+                {
+                    respRef.getReferral().addLdapUrl( url );
+                }
+
+                LdapURL ldapUrl = new LdapURL();
+                ldapUrl.setForceScopeRendering( true );
+                try
+                {
+                    ldapUrl.parse( url.toCharArray() );
+                }
+                catch ( LdapURLEncodingException e )
+                {
+                    LOG.error( I18n.err( I18n.ERR_165, url, entry ) );
+                }
+
+                switch ( req.getScope() )
+                {
+                    case SUBTREE:
+                        ldapUrl.setScope( SearchScope.SUBTREE.getScope() );
+                        break;
+
+                    case ONELEVEL: // one level here is object level on remote server
+                        ldapUrl.setScope( SearchScope.OBJECT.getScope() );
+                        break;
+
+                    default:
+                        throw new IllegalStateException( I18n.err( I18n.ERR_686 ) );
+                }
+
+                respRef.getReferral().addLdapUrl( ldapUrl.toString() );
+            }
+
+            return respRef;
+        }
+        else
+        {
+            // The entry is not a referral, or the ManageDsaIt control is set
+            InternalSearchResponseEntry respEntry;
+            respEntry = new SearchResponseEntryImpl( req.getMessageId() );
+            respEntry.setEntry( entry );
+            respEntry.setObjectName( entry.getDn() );
+
+            // Filter the userPassword if the server mandate to do so
+            if ( session.getCoreSession().getDirectoryService().isPasswordHidden() )
+            {
+                // Remove the userPassord attribute from the entry.
+                respEntry.getEntry().removeAttributes( SchemaConstants.USER_PASSWORD_AT );
+            }
+
+            return respEntry;
+        }
+    }
+
+
+    /**
+     * Return the server size limit
+     */
+    private long getServerSizeLimit( LdapSession session, InternalSearchRequest request )
+    {
+        if ( session.getCoreSession().isAnAdministrator() )
+        {
+            if ( request.getSizeLimit() == NO_SIZE_LIMIT )
+            {
+                return Long.MAX_VALUE;
+            }
+            else
+            {
+                return request.getSizeLimit();
+            }
+        }
+        else
+        {
+            if ( ldapServer.getMaxSizeLimit() == NO_SIZE_LIMIT )
+            {
+                return Long.MAX_VALUE;
+            }
+            else
+            {
+                return ldapServer.getMaxSizeLimit();
+            }
+        }
+    }
+
+
+    private void setTimeLimitsOnCursor( InternalSearchRequest req, LdapSession session,
+        final EntryFilteringCursor cursor )
+    {
+        // Don't bother setting time limits for administrators
+        if ( session.getCoreSession().isAnAdministrator() && req.getTimeLimit() == NO_TIME_LIMIT )
+        {
+            return;
+        }
+
+        /*
+         * Non administrator based searches are limited by time if the server 
+         * has been configured with unlimited time and the request specifies 
+         * unlimited search time
+         */
+        if ( ldapServer.getMaxTimeLimit() == NO_TIME_LIMIT && req.getTimeLimit() == NO_TIME_LIMIT )
+        {
+            return;
+        }
+
+        /*
+         * If the non-administrator user specifies unlimited time but the server 
+         * is configured to limit the search time then we limit by the max time 
+         * allowed by the configuration 
+         */
+        if ( req.getTimeLimit() == 0 )
+        {
+            cursor.setClosureMonitor( new SearchTimeLimitingMonitor( ldapServer.getMaxTimeLimit(), TimeUnit.SECONDS ) );
+            return;
+        }
+
+        /*
+         * If the non-administrative user specifies a time limit equal to or 
+         * less than the maximum limit configured in the server then we 
+         * constrain search by the amount specified in the request
+         */
+        if ( ldapServer.getMaxTimeLimit() >= req.getTimeLimit() )
+        {
+            cursor.setClosureMonitor( new SearchTimeLimitingMonitor( req.getTimeLimit(), TimeUnit.SECONDS ) );
+            return;
+        }
+
+        /*
+         * Here the non-administrative user's requested time limit is greater 
+         * than what the server's configured maximum limit allows so we limit
+         * the search to the configured limit
+         */
+        cursor.setClosureMonitor( new SearchTimeLimitingMonitor( ldapServer.getMaxTimeLimit(), TimeUnit.SECONDS ) );
+    }
+
+
+    public ExprNode modifyFilter( LdapSession session, InternalSearchRequest req ) throws Exception
+    {
+        /*
+         * Do not add the OR'd (objectClass=referral) expression if the user 
+         * searches for the subSchemaSubEntry as the SchemaIntercepter can't 
+         * handle an OR'd filter.
+         */
+        //        if ( isSubSchemaSubEntrySearch( session, req ) )
+        //        {
+        //            return;
+        //        }
+
+        /*
+         * Most of the time the search filter is just (objectClass=*) and if 
+         * this is the case then there's no reason at all to OR this with an
+         * (objectClass=referral).  If we detect this case then we leave it 
+         * as is to represent the OR condition:
+         * 
+         *  (| (objectClass=referral)(objectClass=*)) == (objectClass=*)
+         */
+        boolean isOcPresenceFilter = false;
+        if ( req.getFilter() instanceof PresenceNode )
+        {
+            PresenceNode presenceNode = ( PresenceNode ) req.getFilter();
+
+            AttributeType at = session.getCoreSession().getDirectoryService().getSchemaManager()
+                .lookupAttributeTypeRegistry( presenceNode.getAttribute() );
+            if ( at.getOid().equals( SchemaConstants.OBJECT_CLASS_AT_OID ) )
+            {
+                isOcPresenceFilter = true;
+            }
+        }
+
+        ExprNode filter = req.getFilter();
+        if ( !req.hasControl( ManageDsaITControl.CONTROL_OID ) && !isOcPresenceFilter )
+        {
+            filter = new OrNode( req.getFilter(), newIsReferralEqualityNode( session ) );
+        }
+
+        return filter;
+    }
+
+
+    private EqualityNode<String> newIsReferralEqualityNode( LdapSession session ) throws Exception
+    {
+        if ( objectClassAttributeType == null )
+        {
+            objectClassAttributeType = session.getCoreSession().getDirectoryService().getSchemaManager()
+                .lookupAttributeTypeRegistry( SchemaConstants.OBJECT_CLASS_AT );
+        }
+
+        EqualityNode<String> ocIsReferral = new EqualityNode<String>( SchemaConstants.OBJECT_CLASS_AT, new StringValue(
+            objectClassAttributeType, SchemaConstants.REFERRAL_OC ) );
+
+        return ocIsReferral;
+    }
+
+
+    private void storeReplicaInfo()
+    {
+        try
+        {
+            for( Map.Entry<Integer, ReplicaEventLog> e : replicaLogMap.entrySet() )
+            {
+                ReplicaEventLog replica = e.getValue();
+                if( replica.isDirty() )
+                {
+                    LOG.debug( "updating the details of replica {}", replica );
+                    replicaUtil.updateReplicaLastSentCsn( replica );
+                    replica.setDirty( false );
+                }
+            }
+        }
+        catch( Exception e )
+        {
+            LOG.error( "Failed to store the replica information", e );
+        }
+    }
+    
+    
+    private void loadReplicaInfo()
+    {
+        try
+        {
+
+            List<ReplicaEventLog> replicas = replicaUtil.getReplicaConsumers();
+            if( !replicas.isEmpty() )
+            {
+                for( ReplicaEventLog r : replicas )
+                {
+                    LOG.debug( "initializing the replica log from {}", r.getId() );
+                    r.configure( amqConnection, brokerService );
+                    replicaLogMap.put( r.getId(), r );
+                    
+                    // update the replicaCount's value to assign a correct value to the new replica(s) 
+                    if( replicaCount.get() < r.getId() )
+                    {
+                        replicaCount.set( r.getId() );
+                    }
+                }
+            }
+            else
+            {
+                LOG.debug( "no replica logs found to initialize" );
+            }
+        }
+        catch( Exception e )
+        {
+            LOG.error( "Failed to load the replica information", e );
+        }
+    }
+    
+ 
+    private void registerPersistentSearches() throws Exception
+    {
+        for( Map.Entry<Integer, ReplicaEventLog> e : replicaLogMap.entrySet() )
+        {
+            ReplicaEventLog log = e.getValue();
+            
+            if( log.getSearchCriteria() != null )
+            {
+                LOG.debug( "registering peristent search for the replica {}", log.getId() );
+                SyncReplSearchListener handler = new SyncReplSearchListener( null, null, log, false );
+                log.setPersistentListener( handler );
+                
+                dirService.getEventService().addListener( handler, log.getSearchCriteria() );
+            }
+            else
+            {
+                LOG.warn( "invalid peristent search criteria {} for the replica {}", log.getSearchCriteria(), log.getId() );
+            }
+        }    
+    }
+    
+    
+    private Runnable createConsumerInfoUpdateTask()
+    {
+        Runnable task = new Runnable()
+        {
+            public void run()
+            {
+                while( true )
+                {
+                    storeReplicaInfo();
+                    try
+                    {
+                        Thread.sleep( 10000 );
+                    }
+                    catch( InterruptedException e )
+                    {
+                        LOG.warn( "thread storing the replica information was interrupted", e );
+                    }
+                }
+            }
+        };
+        
+        return task;
+    }
+    
+ 
+    private boolean isValidCookie( String cookieString )
+    {
+        if( cookieString == null || cookieString.trim().length() == 0 )
+        {
+            return false;
+        }
+        
+        int pos = cookieString.indexOf( REPLICA_ID_DELIM );
+        if( pos <= 0 ) // position should start from 1 or higher cause a cookie can be like "0;<csn>" or "11;<csn>"
+        {
+            return false;
+        }
+        
+        String replicaId = cookieString.substring( 0, pos );
+        try
+        {
+            Integer.parseInt( replicaId );
+        }
+        catch( NumberFormatException e )
+        {
+            LOG.debug( "Failed to parse the replica id {}", replicaId );
+            return false;
+        }
+        
+        if( pos == cookieString.length() )
+        {
+            return false;
+        }
+        
+        String csnString = cookieString.substring( pos + 1 );
+        
+        return Csn.isValid( csnString );
+    }
+    
+    
+    private int getReplicaId( String cookieString )
+    {
+        String replicaId = cookieString.substring( 0, cookieString.indexOf( REPLICA_ID_DELIM ) );
+        return Integer.parseInt( replicaId );
+    }
+    
+    
+    private ReplicaEventLog getReplicaEventLog( String cookieString ) throws Exception
+    {
+        ReplicaEventLog replicaLog = null;
+        
+        if( isValidCookie( cookieString ) )
+        {
+            int clientId = getReplicaId( cookieString );
+            replicaLog = replicaLogMap.get( clientId );
+        }
+        
+        return replicaLog;
+    }
+    
+    
+    private ReplicaEventLog createRelicaEventLog( String hostName, String filter ) throws Exception
+    {
+        int replicaId = replicaCount.incrementAndGet();
+        
+        LOG.debug( "creating a new event log for the replica with id {}", replicaId );
+        
+        ReplicaEventLog replicaLog = new ReplicaEventLog( replicaId );
+        replicaLog.setHostName( hostName );
+        replicaLog.setSearchFilter( filter );
+        
+        replicaLog.configure( amqConnection, brokerService );
+        
+        return replicaLog;
+    }
+    
+    
+    private void sendESyncRefreshRequired( LdapSession session, InternalSearchRequest req ) throws Exception
+    {
+        InternalSearchResponseDone searchDoneResp = ( InternalSearchResponseDone ) req.getResultResponse();
+        searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.E_SYNC_REFRESH_REQUIRED );
+        SyncDoneValueControl syncDone = new SyncDoneValueControl();
+        searchDoneResp.add( syncDone );
+        
+        session.getIoSession().write( searchDoneResp );
+    }
+    
+    
+    private boolean isRefreshNPersist( InternalSearchRequest req )
+    {
+        SyncRequestValueControl control = ( SyncRequestValueControl ) req.getControls().get( SyncRequestValueControl.CONTROL_OID );
+        return ( control.getMode() == SynchronizationModeEnum.REFRESH_AND_PERSIST ? true : false );
+    }
+}



Mime
View raw message