directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kayyag...@apache.org
Subject svn commit: r750091 - /directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/
Date Wed, 04 Mar 2009 18:25:04 GMT
Author: kayyagari
Date: Wed Mar  4 18:25:03 2009
New Revision: 750091

URL: http://svn.apache.org/viewvc?rev=750091&view=rev
Log:
 initial code drop for syncrepl consumer agent

Added:
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnection.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnectionImpl.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolCodecFactory.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolDecoder.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolEncoder.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConfiguration.java
    directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConsumer.java

Added: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnection.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnection.java?rev=750091&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnection.java (added)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnection.java Wed Mar  4 18:25:03 2009
@@ -0,0 +1,124 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.directory.mitosis.syncrepl;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.directory.shared.ldap.codec.LdapMessage;
+import org.apache.directory.shared.ldap.codec.LdapResponse;
+import org.apache.directory.shared.ldap.codec.search.SearchRequest;
+import org.apache.directory.shared.ldap.codec.search.SearchResultEntry;
+
+
+/**
+ * 
+ *  Describe the methods to be implemented by the LdapConnection class.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public interface LdapConnection
+{
+    /**
+     * Connect to the remote LDAP server
+     *
+     * @return <code>true</code> if the connection is established, false otherwise
+     * @throws IOException if some I/O error occurs
+     */
+    boolean connect() throws IOException; 
+    
+    /**
+     * Disconnect from the remote LDAP server
+     *
+     * @return <code>true</code> if the connection is closed, false otherwise
+     * @throws IOException if some I/O error occurs
+     */
+    boolean close() throws IOException; 
+    
+    /**
+     * Simple Bind on a server
+     *
+     * @param name The name we use to authenticate the user. It must be a 
+     * valid DN
+     * @param credentials The password. It can't be null 
+     * @return The BindResponse LdapResponse 
+     */
+    LdapResponse bind( String name, String credentials ) throws Exception;
+    
+    /**
+     * Do a search, on the base object, using the given filter. The
+     * SearchRequest parameters default to :
+     * Scope : ONE
+     * DerefAlias : ALWAYS
+     * SizeLimit : none
+     * TimeLimit : none
+     * TypesOnly : false
+     * Attributes : all the user's attributes 
+     * 
+     * @param baseObject The base for the search. It must be a valid
+     * DN, and can't be emtpy
+     * @param filter The filter to use for this search. It can't be empty
+     * @return An Object array of size 2 containing the result entries
+     * at the 0th index and the syncdone value at 1st index
+     */
+    Object[] search( String baseObject, String filter ) throws Exception;
+    
+    
+    /**
+     * Send the already built SearchRequest to the server.
+     *  
+     * @param searchRequest the SearchRequest object to send to the server
+     * @return An Object array of size 2 containing the result entries
+     * at the 0th index and the syncdone value at 1st index
+     */
+    Object[] search( SearchRequest searchRequest ) throws Exception;
+    
+    /*
+    void search( String baseObject, SearchScope scope, int derefAlias,
+        int sizeLimit, int timeLimit, boolean typesOnly, String filter, 
+        String[] attributes );
+    
+    void search( String baseObject, SearchScope scope, int derefAlias,
+        int sizeLimit, int timeLimit, boolean typesOnly, String filter, 
+        String[] attributes );
+    
+    void search( String baseObject, SearchScope scope, int derefAlias,
+        int sizeLimit, int timeLimit, boolean typesOnly, String filter, 
+        String[] attributes );
+    
+    void search( String baseObject, SearchScope scope, int derefAlias,
+        int sizeLimit, int timeLimit, boolean typesOnly, String filter, 
+        String[] attributes );
+    */
+    
+    /**
+     * UnBind from a server
+     */
+    void unBind() throws Exception;
+    
+    
+    /**
+     * Return the response stored into the current session.
+     *
+     * @return The last request response
+     */
+    LdapMessage getResponse();
+}

Added: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnectionImpl.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnectionImpl.java?rev=750091&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnectionImpl.java (added)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnectionImpl.java Wed Mar  4 18:25:03 2009
@@ -0,0 +1,701 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.directory.mitosis.syncrepl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import javax.naming.InvalidNameException;
+import javax.net.ssl.SSLContext;
+
+import org.apache.directory.shared.asn1.ber.IAsn1Container;
+import org.apache.directory.shared.ldap.codec.LdapConstants;
+import org.apache.directory.shared.ldap.codec.LdapMessage;
+import org.apache.directory.shared.ldap.codec.LdapMessageContainer;
+import org.apache.directory.shared.ldap.codec.LdapResponse;
+import org.apache.directory.shared.ldap.codec.TwixTransformer;
+import org.apache.directory.shared.ldap.codec.bind.BindRequest;
+import org.apache.directory.shared.ldap.codec.bind.SimpleAuthentication;
+import org.apache.directory.shared.ldap.codec.search.Filter;
+import org.apache.directory.shared.ldap.codec.search.SearchRequest;
+import org.apache.directory.shared.ldap.codec.search.SearchResultDone;
+import org.apache.directory.shared.ldap.codec.search.SearchResultEntry;
+import org.apache.directory.shared.ldap.codec.unbind.UnBindRequest;
+import org.apache.directory.shared.ldap.constants.SchemaConstants;
+import org.apache.directory.shared.ldap.filter.ExprNode;
+import org.apache.directory.shared.ldap.filter.FilterParser;
+import org.apache.directory.shared.ldap.filter.SearchScope;
+import org.apache.directory.shared.ldap.message.ResultCodeEnum;
+import org.apache.directory.shared.ldap.name.LdapDN;
+import org.apache.directory.shared.ldap.util.StringTools;
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.ssl.SslFilter;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * An implementation of a Ldap connection
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class LdapConnectionImpl extends IoHandlerAdapter implements LdapConnection
+{
+    /** logger for reporting errors that might not be handled properly upstream */
+    private static final Logger LOG = LoggerFactory.getLogger( LdapConnectionImpl.class );
+
+    /** Define the default ports for LDAP and LDAPS */
+    private static final int DEFAULT_LDAP_PORT = 389; 
+    private static final int DEFAULT_LDAPS_PORT = 686;
+    
+    /** The default host : localhost */
+    private static final String DEFAULT_LDAP_HOST = "127.0.0.1";
+    
+    /** The LDAP version */
+    private static int LDAP_V3 = 3;
+    
+    private static final String LDAP_RESPONSE = "LdapReponse";
+    
+    /** A flag indicating if we are using SSL or not */
+    private boolean useSsl = false;
+    
+    /** The default timeout for operation : 30 seconds */
+    private static final long DEFAULT_TIMEOUT = 30000L;
+    
+    /** The timeout used for response we are waiting for */ 
+    private long timeOut = DEFAULT_TIMEOUT;
+    
+    /** The selected LDAP port */
+    private int ldapPort;
+    
+    /** the remote LDAP host */
+    private String ldapHost;
+    
+    /** The connector open with the remote server */
+    private NioSocketConnector connector;
+    
+    /** The Ldap codec */
+    private IoFilter ldapProtocolFilter = new ProtocolCodecFilter(
+            new LdapProtocolCodecFactory() );
+
+    /**  
+     * The created session, created when we open a connection with
+     * the Ldap server.
+     */
+    private IoSession ldapSession;
+    
+    /** A Message ID which is incremented for each operation */
+    private int messageId;
+    
+    /** A queue used to store the incoming responses */
+    private BlockingQueue<LdapMessage> responseQueue;
+    
+    /** An operation mutex to guarantee the operation order */
+    private Semaphore operationMutex;
+    
+    
+
+    //------------------------- The constructors --------------------------//
+    /**
+     * Create a new instance of a LdapConnection on localhost,
+     * port 389.
+     */
+    public LdapConnectionImpl()
+    {
+        setSsl( false );
+        ldapPort = DEFAULT_LDAP_PORT;
+        ldapHost = DEFAULT_LDAP_HOST;
+        messageId = 1;
+        operationMutex = new Semaphore(1);
+    }
+    
+    
+    /**
+     * Create a new instance of a LdapConnection on localhost,
+     * port 389 if the SSL flag is off, or 686 otherwise.
+     * 
+     * @param useSsl A flag to tell if it's a SSL connection or not.
+     */
+    public LdapConnectionImpl( boolean ssl )
+    {
+        setSsl( ssl );
+        ldapPort = ( ssl ? DEFAULT_LDAPS_PORT : DEFAULT_LDAP_PORT );
+        ldapHost = DEFAULT_LDAP_HOST;
+        messageId = 1;
+        operationMutex = new Semaphore(1);
+    }
+    
+    
+    /**
+     * Create a new instance of a LdapConnection on a given
+     * server, using the default port (389)
+     *
+     * @param server The server we want to be connected to
+     * @param port The port the server is listening to
+     */
+    public LdapConnectionImpl( String server )
+    {
+        setSsl( false );
+        ldapPort = DEFAULT_LDAP_PORT;
+        ldapHost = server;
+        messageId = 1;
+        operationMutex = new Semaphore(1);
+    }
+    
+    
+    /**
+     * Create a new instance of a LdapConnection on a given
+     * server, using the default port (389)
+     *
+     * @param server The server we want to be connected to
+     * @param port The port the server is listening to
+     * @param useSsl A flag to tell if it's a SSL connection or not.
+     */
+    public LdapConnectionImpl( String server, boolean ssl )
+    {
+        setSsl( ssl );
+        ldapPort = ( ssl ? DEFAULT_LDAPS_PORT : DEFAULT_LDAP_PORT );
+        ldapHost = DEFAULT_LDAP_HOST;
+        messageId = 1;
+        operationMutex = new Semaphore(1);
+    }
+    
+    
+    /**
+     * Create a new instance of a LdapConnection
+     *
+     * @param server The server we want to be connected to
+     * @param port The port the server is listening to
+     */
+    public LdapConnectionImpl( String server, int port )
+    {
+        setSsl( false );
+        ldapPort = port;
+        ldapHost = server;
+        messageId = 1;
+        operationMutex = new Semaphore(1);
+    }
+    
+    
+    /**
+     * Create a new instance of a LdapConnection
+     *
+     * @param server The server we want to be connected to
+     * @param port The port the server is listening to
+     * @param useSsl A flag to tell if it's a SSL connection or not.
+     */
+    public LdapConnectionImpl( String server, int port, boolean ssl )
+    {
+        setSsl( ssl );
+        ldapPort = port;
+        ldapHost = server;
+        messageId = 1;
+        operationMutex = new Semaphore(1);
+    }
+    
+    
+    /**
+     * Set the timeOut for the responses. We wont wait longer than this 
+     * value.
+     *
+     * @param timeOut The timeout, in milliseconds
+     */
+    public void setTimeOut( long timeOut )
+    {
+        this.timeOut = timeOut;
+    }
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public boolean connect() throws IOException 
+    {
+        // Create the connector
+        connector = new NioSocketConnector();
+        
+        if ( ( ldapSession != null ) && ldapSession.isConnected() ) 
+        {
+            throw new IllegalStateException( "Already connected. Disconnect first." );
+        }
+
+        // Add the codec to the chain
+        connector.getFilterChain().addLast( "ldapCodec", ldapProtocolFilter );
+
+        // If we use SSL, we have to add the SslFilter to the chain
+        if (useSsl) 
+        {
+            SSLContext sslContext = null; // BogusSslContextFactory.getInstance( false );
+            SslFilter sslFilter = new SslFilter( sslContext );
+            sslFilter.setUseClientMode(true);
+            connector.getFilterChain().addLast( "sslFilter", sslFilter );
+        }
+
+        // Inject the protocolHandler
+        connector.setHandler( this );
+        
+        // Build the connection address
+        SocketAddress address = new InetSocketAddress( ldapHost , ldapPort );
+        
+        // And create the connection future
+        ConnectFuture connectionFuture = connector.connect( address );
+        
+        // Wait until it's established
+        connectionFuture.awaitUninterruptibly();
+        
+        if ( !connectionFuture.isConnected() ) 
+        {
+            return false;
+        }
+        
+        // Get back the session
+        ldapSession = connectionFuture.getSession();
+        
+        // And inject the current Ldap container into the session
+        IAsn1Container ldapMessageContainer = new LdapMessageContainer();
+        
+        // Store the container into the session 
+        ldapSession.setAttribute( "LDAP-Container", ldapMessageContainer );
+        
+        // Create the responses queue
+        responseQueue = new LinkedBlockingQueue<LdapMessage>();
+        
+        // And return
+        return true;
+    }
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public boolean close() throws IOException 
+    {
+        // Release the connector
+        connector.dispose();
+        
+        return true;
+    }
+    
+    
+    //------------------------ The LDAP operations ------------------------//
+    // Bind operations                                                     //
+    //---------------------------------------------------------------------//
+    /**
+     * Anonymous Bind on a server. 
+     *
+     * @return The BindResponse LdapResponse 
+     */
+    public LdapResponse bind() throws Exception
+    {
+        LOG.debug( " Unauthenticated Authentication bind" );
+        
+        LdapResponse response = bind( (String)null, (byte[])null );
+        
+        if (response.getLdapResult().getResultCode() == ResultCodeEnum.SUCCESS )
+        {
+            LOG.debug( " Unauthenticated Authentication bind successfull" );
+        }
+        else
+        {
+            LOG.debug( " Unauthenticated Authentication bind failure {}", response );
+        }
+        
+        return response;
+    }
+    
+    
+    /**
+     * An Unauthenticated Authentication Bind on a server. (cf RFC 4513,
+     * par 5.1.2)
+     *
+     * @param name The name we use to authenticate the user. It must be a 
+     * valid DN
+     * @return The BindResponse LdapResponse 
+     */
+    public LdapResponse bind( String name ) throws Exception
+    {
+        LOG.debug( "Anonymous bind" );
+        
+        LdapResponse response = bind( name, (byte[])null );
+        
+        if (response.getLdapResult().getResultCode() == ResultCodeEnum.SUCCESS )
+        {
+            LOG.debug( "Anonymous bind successfull" );
+        }
+        else
+        {
+            LOG.debug( "Anonymous bind failure {}", response );
+        }
+        
+        return response;
+    }
+    
+    
+    /**
+     * Simple Bind on a server
+     *
+     * @param name The name we use to authenticate the user. It must be a 
+     * valid DN
+     * @param credentials The password.
+     * @return The BindResponse LdapResponse 
+     */
+    public LdapResponse bind( String name, String credentials ) throws Exception
+    {
+        return bind( name, StringTools.getBytesUtf8( credentials ) );
+    }
+    
+    
+    /**
+     * Simple Bind on a server
+     *
+     * @param name The name we use to authenticate the user. It must be a 
+     * valid DN
+     * @param credentials The password.
+     * @return The BindResponse LdapResponse 
+     */
+    public LdapResponse bind( String name, byte[] credentials ) throws Exception
+    {
+        // If the session has not been establish, or is closed, we get out immediately
+        checkSession();
+
+        // Guarantee that for this session, we don't have more than one operation
+        // running at the same time
+        operationMutex.acquire();
+        
+        // Create the BindRequest
+        LdapDN dn = new LdapDN( name );
+        
+        BindRequest bindRequest = new BindRequest();
+        bindRequest.setName( dn );
+        bindRequest.setVersion( LDAP_V3 );
+        
+        // Create the Simple authentication
+        SimpleAuthentication simpleAuth = new SimpleAuthentication();
+        simpleAuth.setSimple( credentials );
+
+        bindRequest.setAuthentication( simpleAuth );
+        
+        // Encode the request
+        LdapMessage message = new LdapMessage();
+        message.setMessageId( messageId++ );
+        message.setProtocolOP( bindRequest );
+        
+        LOG.debug( "-----------------------------------------------------------------" );
+        LOG.debug( "Sending request \n{}", message );
+
+        // Send the request to the server
+        ldapSession.write( message );
+
+        // Read the response, waiting for it if not available immediately
+        LdapMessage response = responseQueue.poll( timeOut, TimeUnit.MILLISECONDS );
+    
+        // Check that we didn't get out because of a timeout
+        if ( response == null )
+        {
+            // We didn't received anything : this is an error
+            operationMutex.release();
+            throw new Exception( "TimeOut occured" );
+        }
+        
+        operationMutex.release();
+        
+        // Everything is fine, return the response
+        return response.getBindResponse();
+    }
+
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void unBind() throws Exception
+    {
+        // If the session has not been establish, or is closed, we get out immediately
+        checkSession();
+        
+        // Guarantee that for this session, we don't have more than one operation
+        // running at the same time
+        operationMutex.acquire();
+        
+        // Create the UnBindRequest
+        UnBindRequest unBindRequest = new UnBindRequest();
+        
+        // Encode the request
+        LdapMessage message = new LdapMessage();
+        message.setMessageId( messageId );
+        message.setProtocolOP( unBindRequest );
+        
+        LOG.debug( "-----------------------------------------------------------------" );
+        LOG.debug( "Sending request \n{}", message );
+        
+        // Send the request to the server
+        ldapSession.write( message );
+
+        // We don't have to wait for a response. Reset the messageId counter to 0
+        messageId = 0;
+        
+        // We also have to reset the response queue
+        responseQueue.clear();
+        
+        operationMutex.release();
+    }
+    
+
+    /**
+     * {@inheritDoc}
+     */
+    public Object[] search( String baseObject, String filterString ) throws Exception
+    {
+        // If the session has not been establish, or is closed, we get out immediately
+        checkSession();
+        
+        LdapDN baseDN = null;
+        Filter filter = null;
+        
+        // Check that the baseObject is not null or empty, 
+        // and is a valid DN
+        if ( StringTools.isEmpty( baseObject ) )
+        {
+            throw new Exception( "Cannot search on RootDSE when the scope is not BASE" );
+        }
+        
+        try
+        {
+            baseDN = new LdapDN( baseObject );
+        }
+        catch ( InvalidNameException ine )
+        {
+            throw new Exception( "The baseObject is not a valid DN" );
+        }
+        
+        // Check that the filter is valid
+        try
+        {
+            ExprNode filterNode = FilterParser.parse( filterString );
+            
+            filter = TwixTransformer.transformFilter( filterNode );
+        }
+        catch ( ParseException pe )
+        {
+            throw new Exception( "The filter is invalid" );
+        }
+        
+        SearchRequest searchRequest = new SearchRequest();
+        searchRequest.setBaseObject( baseDN );
+        searchRequest.setFilter( filter );
+        
+        // Fill the default values
+        searchRequest.setSizeLimit( 0 );
+        searchRequest.setTimeLimit( 0 );
+        searchRequest.setDerefAliases( LdapConstants.DEREF_ALWAYS );
+        searchRequest.setScope( SearchScope.ONELEVEL );
+        searchRequest.setTypesOnly( false );
+        searchRequest.addAttribute( SchemaConstants.ALL_USER_ATTRIBUTES );
+
+        return search( searchRequest );
+    }
+    
+
+    /**
+     * {@inheritDoc}
+     */
+    public Object[] search( SearchRequest searchRequest ) throws Exception
+    {
+        // First check the session
+        checkSession();
+        
+        // Guarantee that for this session, we don't have more than one operation
+        // running at the same time
+        operationMutex.acquire();
+        
+        // Encode the request
+        LdapMessage message = new LdapMessage();
+        message.setMessageId( messageId++ );
+        message.setProtocolOP( searchRequest );
+        message.addControl( searchRequest.getCurrentControl() );
+        
+        LOG.debug( "-----------------------------------------------------------------" );
+        LOG.debug( "Sending request \n{}", message );
+    
+        // Loop and get all the responses
+        // Send the request to the server
+        ldapSession.write( message );
+        
+        int i = 0;
+        
+        List<SearchResultEntry> searchResults = new ArrayList<SearchResultEntry>();
+        
+        // Now wait for the responses
+        // Loop until we got all the responses
+        do
+        {
+            // If we get out before the timeout, check that the response 
+            // is there, and get it
+            LdapMessage response = responseQueue.poll( timeOut, TimeUnit.MILLISECONDS );
+            
+            if ( response == null )
+            {
+                // No response, get out
+                operationMutex.release();
+                
+                // We didn't received anything : this is an error
+                throw new Exception( "TimeOut occured" );
+            }
+            
+            i++;
+
+            // Print the response
+            LOG.info( "Result[" + i + "]" + response );
+            
+            if ( response.getMessageType() == LdapConstants.SEARCH_RESULT_DONE )
+            {
+                SearchResultDone resDone = response.getSearchResultDone();
+                resDone.addControl( response.getCurrentControl() );
+                operationMutex.release();
+                
+                return new Object[]{ searchResults, resDone };
+            }
+            
+            SearchResultEntry sre = response.getSearchResultEntry();
+            sre.addControl( response.getCurrentControl() );
+            
+            searchResults.add( sre  );
+        }
+        while ( true );
+    }
+
+    /**
+     * A helper method to set the useSsl flag
+     * 
+     * @param ssl The ssl value flag to set
+     */
+    private void setSsl( boolean ssl )
+    {
+        this.useSsl = ssl;
+
+        if ( ssl )
+        {
+            ldapPort = DEFAULT_LDAPS_PORT;
+        }
+        else
+        {
+            ldapPort = DEFAULT_LDAP_PORT;
+        }
+    }
+    
+    
+    //--------------------------- Helper methods ---------------------------//
+    /**
+     * Check if the connection is valid : created and connected
+     *
+     * @return <code>true</code> if the session is valid.
+     */
+    private boolean isSessionValid()
+    {
+        return ( ldapSession != null ) && ldapSession.isConnected();
+    }
+
+    
+    /**
+     * Check that a session is valid, ie we can send requests to the
+     * server
+     *
+     * @throws Exception If the session is not valid
+     */
+    private void checkSession() throws Exception
+    {
+        if ( !isSessionValid() )
+        {
+            throw new Exception( "Cannot connect on the server, the connection is invalid" );
+        }
+    }
+    
+    /**
+     * Return the response stored into the current session.
+     *
+     * @return The last request response
+     */
+    public LdapMessage getResponse()
+    {
+        return (LdapMessage)ldapSession.getAttribute( LDAP_RESPONSE );
+    }
+
+    
+    //----------------- ProtocolHandler implemented methods -----------------//
+    
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void sessionCreated( IoSession session ) throws Exception 
+    {
+        LOG.debug( "-------> New session created <-------" );
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void sessionOpened( IoSession session ) throws Exception 
+    {
+        LOG.debug( "-------> Session opened <-------" );
+    }
+
+    
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void sessionClosed(IoSession session) throws Exception 
+    {
+        LOG.debug( "-------> Session Closed <-------" );
+    }
+
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void messageReceived( IoSession session, Object message) throws Exception 
+    {
+        // Feed the response and store it into the session
+        LdapMessage response = (LdapMessage)message;
+
+        LOG.debug( "-------> Message received <-------" + response );
+        
+        // Store the response into the responseQueue
+        responseQueue.add( response );
+    }
+    
+    
+}

Added: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolCodecFactory.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolCodecFactory.java?rev=750091&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolCodecFactory.java (added)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolCodecFactory.java Wed Mar  4 18:25:03 2009
@@ -0,0 +1,73 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.directory.mitosis.syncrepl;
+
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+
+/**
+ * 
+ * The factory used to create the LDAP encoder and decoder.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class LdapProtocolCodecFactory implements ProtocolCodecFactory
+{
+    /** The Ldap encoder */
+    private ProtocolEncoder ldapEncoder;
+    
+    /** The Ldap decoder */
+    private ProtocolDecoder ldapDecoder;
+
+    
+    /**
+     * 
+     * Creates a new instance of LdapProtocolCodecFactory. It
+     * creates the encoded an decoder instances.
+     *
+     */
+    public LdapProtocolCodecFactory()
+    {
+        // Create the encoder.
+        ldapEncoder = new LdapProtocolEncoder();
+        ldapDecoder = new LdapProtocolDecoder();
+    }
+    
+    
+    /**
+     * Get the Ldap decoder. 
+     */
+    public ProtocolDecoder getDecoder( IoSession session ) throws Exception
+    {
+        return ldapDecoder;
+    }
+    
+
+    /**
+     * Get the Ldap encoder.
+     */
+    public ProtocolEncoder getEncoder( IoSession session ) throws Exception
+    {
+        return ldapEncoder;
+    }    
+}

Added: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolDecoder.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolDecoder.java?rev=750091&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolDecoder.java (added)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolDecoder.java Wed Mar  4 18:25:03 2009
@@ -0,0 +1,92 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.directory.mitosis.syncrepl;
+
+import org.apache.directory.shared.asn1.ber.Asn1Decoder;
+import org.apache.directory.shared.asn1.ber.IAsn1Container;
+import org.apache.directory.shared.asn1.codec.DecoderException;
+import org.apache.directory.shared.ldap.codec.LdapDecoder;
+import org.apache.directory.shared.ldap.codec.LdapMessage;
+import org.apache.directory.shared.ldap.codec.LdapMessageContainer;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+
+/**
+ * 
+ * A LDAP message decoder. It is based on shared-ldap decoder.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class LdapProtocolDecoder implements ProtocolDecoder
+{
+    /**
+     * Decode a Ldap request and write it to the remote server.
+     * 
+     * @param session The session containing the LdapMessageContainer
+     * @param buffer The ByteBuffer containing the incoming bytes to decode
+     * to a LDAP message
+     * @param out The callback we have to invoke when the message has been decoded 
+     */
+    public void decode( IoSession session, IoBuffer buffer, ProtocolDecoderOutput out ) throws Exception
+    {
+        // Allocate a LdapMessage Container
+        Asn1Decoder ldapDecoder = new LdapDecoder();
+        IAsn1Container ldapMessageContainer = (LdapMessageContainer)session.getAttribute( "LDAP-Container" );
+
+        // Decode a LDAP PDU
+        try
+        {
+            ldapDecoder.decode( buffer.buf(), ldapMessageContainer );
+        }
+        catch ( DecoderException de )
+        {
+            de.printStackTrace();
+        }
+        
+        // get back the decoded message
+        LdapMessage message = ( ( LdapMessageContainer ) ldapMessageContainer ).getLdapMessage();
+        
+        // Clean the container for the next decoding
+        ( ( LdapMessageContainer ) ldapMessageContainer).clean();
+        
+        // Send back the message
+        out.write( message );
+        
+    }
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void finishDecode( IoSession session, ProtocolDecoderOutput out ) throws Exception
+    {
+    }
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void dispose( IoSession session ) throws Exception
+    {
+    }
+}

Added: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolEncoder.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolEncoder.java?rev=750091&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolEncoder.java (added)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapProtocolEncoder.java Wed Mar  4 18:25:03 2009
@@ -0,0 +1,75 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.directory.mitosis.syncrepl;
+
+import java.nio.ByteBuffer;
+
+import org.apache.directory.shared.ldap.codec.LdapMessage;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+
+/**
+ * 
+ * A LDAP encoder.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class LdapProtocolEncoder implements ProtocolEncoder
+{
+    /**
+     * Encode a Ldap request and write it to the remote server.
+     * 
+     * @param session The session containing the LdapMessageContainer
+     * @param request The LDAP message we have to encode to a Byte stream
+     * @param out The callback we have to invoke when the message has been encoded 
+     */
+    public void encode( IoSession session, Object request, ProtocolEncoderOutput out ) throws Exception
+    {
+        if ( request instanceof LdapMessage )
+        {
+            LdapMessage ldapRequest = (LdapMessage)request;
+            ByteBuffer bb = ldapRequest.encode( null );
+            bb.flip();
+            
+            IoBuffer buffer = IoBuffer.allocate( bb.limit(), false );
+            buffer.setAutoExpand( false );
+            buffer.put( bb );
+            buffer.flip();
+            
+            out.write( buffer );
+        }
+        else
+        {
+            throw new Exception();
+        }
+    }
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void dispose( IoSession session ) throws Exception
+    {
+        // Nothing to dispose
+    }
+}

Added: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConfiguration.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConfiguration.java?rev=750091&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConfiguration.java (added)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConfiguration.java Wed Mar  4 18:25:03 2009
@@ -0,0 +1,265 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.directory.mitosis.syncrepl;
+
+import org.apache.directory.shared.ldap.filter.SearchScope;
+
+/**
+ * 
+ * A class for holding the syncrepl consumer's configuration.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class SyncreplConfiguration
+{
+    /** host name of the syncrepl provider server */
+    private String providerHost;
+
+    /** port number of the syncrepl provider server */
+    private int port;
+
+    /** bind dn */
+    private String bindDn;
+
+    /** password for binding with bind dn */
+    private String credentials;
+
+    /** flag to represent refresh and persist or refreh only mode */
+    private boolean refreshPersist = false;
+
+    /** time interval for successive sync requests */
+    private long consumerInterval = 60 * 1000;
+
+    /** the base DN whose content will be searched for syncing */
+    private String baseDn;
+
+    /** the ldap filter for fetching the entries */
+    private String filter;
+
+    /** a comma separated string of attribute names */
+    private String attributes;
+
+    /** the numer for setting the limit on numer of search results to be fteched
+     * default value is 0 (i.e no limit) */
+    private int searchSizeLimit = 0;
+
+    /** the timeout value to be used while doing a search 
+     * default value is 0 (i.e no limit)*/
+    private int searchTimeout = 0;
+
+    /** the search scope */
+    private int searchScope = SearchScope.ONELEVEL.getJndiScope();
+
+    /**
+     * @return the providerHost
+     */
+    public String getProviderHost()
+    {
+        return providerHost;
+    }
+
+    /**
+     * @param providerHost the providerHost to set
+     */
+    public void setProviderHost( String providerHost )
+    {
+        this.providerHost = providerHost;
+    }
+
+    /**
+     * @return the port
+     */
+    public int getPort()
+    {
+        return port;
+    }
+
+    /**
+     * @param port the port to set
+     */
+    public void setPort( int port )
+    {
+        this.port = port;
+    }
+
+    /**
+     * @return the bindDn
+     */
+    public String getBindDn()
+    {
+        return bindDn;
+    }
+
+    /**
+     * @param bindDn the bindDn to set
+     */
+    public void setBindDn( String bindDn )
+    {
+        this.bindDn = bindDn;
+    }
+
+    /**
+     * @return the credentials
+     */
+    public String getCredentials()
+    {
+        return credentials;
+    }
+
+    /**
+     * @param credentials the credentials to set
+     */
+    public void setCredentials( String credentials )
+    {
+        this.credentials = credentials;
+    }
+
+    /**
+     * @return the refreshPersist
+     */
+    public boolean isRefreshPersist()
+    {
+        return refreshPersist;
+    }
+
+    /**
+     * @param refreshPersist the refreshPersist to set
+     */
+    public void setRefreshPersist( boolean refreshPersist )
+    {
+        this.refreshPersist = refreshPersist;
+    }
+
+    /**
+     * @return the consumerInterval
+     */
+    public long getConsumerInterval()
+    {
+        return consumerInterval;
+    }
+
+    /**
+     * @param consumerInterval the consumerInterval to set
+     */
+    public void setConsumerInterval( long consumerInterval )
+    {
+        this.consumerInterval = consumerInterval;
+    }
+
+    /**
+     * @return the baseDn
+     */
+    public String getBaseDn()
+    {
+        return baseDn;
+    }
+
+    /**
+     * @param baseDn the baseDn to set
+     */
+    public void setBaseDn( String baseDn )
+    {
+        this.baseDn = baseDn;
+    }
+
+    /**
+     * @return the filter
+     */
+    public String getFilter()
+    {
+        return filter;
+    }
+
+    /**
+     * @param filter the filter to set
+     */
+    public void setFilter( String filter )
+    {
+        this.filter = filter;
+    }
+
+    /**
+     * @return the attributes
+     */
+    public String getAttributes()
+    {
+        return attributes;
+    }
+
+    /**
+     * @param attributes the attributes to set
+     */
+    public void setAttributes( String attributes )
+    {
+        this.attributes = attributes;
+    }
+
+    /**
+     * @return the searchSizeLimit
+     */
+    public int getSearchSizeLimit()
+    {
+        return searchSizeLimit;
+    }
+
+    /**
+     * @param searchSizeLimit the searchSizeLimit to set
+     */
+    public void setSearchSizeLimit( int searchSizeLimit )
+    {
+        this.searchSizeLimit = searchSizeLimit;
+    }
+
+    /**
+     * @return the searchTimeout
+     */
+    public int getSearchTimeout()
+    {
+        return searchTimeout;
+    }
+
+    /**
+     * @param searchTimeout the searchTimeout to set
+     */
+    public void setSearchTimeout( int searchTimeout )
+    {
+        this.searchTimeout = searchTimeout;
+    }
+
+    /**
+     * @return the searchScope
+     */
+    public int getSearchScope()
+    {
+        return searchScope;
+    }
+
+    /**
+     * @param searchScope the searchScope to set
+     */
+    public void setSearchScope( int searchScope )
+    {
+        this.searchScope = searchScope;
+    }
+
+    
+    
+}

Added: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConsumer.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConsumer.java?rev=750091&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConsumer.java (added)
+++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConsumer.java Wed Mar  4 18:25:03 2009
@@ -0,0 +1,464 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.directory.mitosis.syncrepl;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.directory.server.core.DefaultDirectoryService;
+import org.apache.directory.server.core.DirectoryService;
+import org.apache.directory.server.core.entry.DefaultServerEntry;
+import org.apache.directory.server.core.entry.ServerEntry;
+import org.apache.directory.server.core.partition.Partition;
+import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmPartition;
+import org.apache.directory.server.ldap.LdapService;
+import org.apache.directory.server.ldap.handlers.extended.StartTlsHandler;
+import org.apache.directory.server.ldap.handlers.extended.StoredProcedureExtendedOperationHandler;
+import org.apache.directory.server.protocol.shared.transport.TcpTransport;
+import org.apache.directory.shared.ldap.codec.Control;
+import org.apache.directory.shared.ldap.codec.LdapResponse;
+import org.apache.directory.shared.ldap.codec.LdapResult;
+import org.apache.directory.shared.ldap.codec.TwixTransformer;
+import org.apache.directory.shared.ldap.codec.controls.replication.syncDoneValue.SyncDoneValueControlCodec;
+import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueControlCodec;
+import org.apache.directory.shared.ldap.codec.search.SearchRequest;
+import org.apache.directory.shared.ldap.codec.search.SearchResultDone;
+import org.apache.directory.shared.ldap.codec.search.SearchResultEntry;
+import org.apache.directory.shared.ldap.constants.SchemaConstants;
+import org.apache.directory.shared.ldap.entry.Entry;
+import org.apache.directory.shared.ldap.filter.ExprNode;
+import org.apache.directory.shared.ldap.filter.FilterParser;
+import org.apache.directory.shared.ldap.filter.SearchScope;
+import org.apache.directory.shared.ldap.message.ResultCodeEnum;
+import org.apache.directory.shared.ldap.message.control.replication.SyncRequestValueControl;
+import org.apache.directory.shared.ldap.message.control.replication.SyncStateTypeEnum;
+import org.apache.directory.shared.ldap.message.control.replication.SynchronizationModeEnum;
+import org.apache.directory.shared.ldap.name.LdapDN;
+import org.apache.directory.shared.ldap.util.StringTools;
+import org.apache.mina.util.AvailablePortFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * 
+ * An agent capable of communicate with some LDAP servers.
+ * 
+ * TODO write test cases
+ * 
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class SyncreplConsumer
+{
+
+    /** the syncrepl configuration */
+    private SyncreplConfiguration config;
+    
+    /** the sync cookie sent by the server */
+    private byte[] syncCookie;
+
+    /** the logger */
+    private static final Logger LOG = LoggerFactory.getLogger( SyncreplConsumer.class );
+
+    /** conection to the syncrepl provider */
+    private LdapConnection connection;
+
+    /** the search request with control */
+    private SearchRequest searchRequest;
+
+    /** the syncrequest control */
+    private SyncRequestValueControl syncReq;
+
+    /** a reference to the directoryservice */
+    private DirectoryService directoryService;
+
+    
+    /**
+     * @return the config
+     */
+    public SyncreplConfiguration getConfig()
+    {
+        return config;
+    }
+
+
+    /**
+     * @param config the config to set
+     */
+    public void setConfig( SyncreplConfiguration config )
+    {
+        this.config = config;
+    }
+
+
+    /**
+     * A helper method to quickly quit the program
+     */
+    private static void quit( LdapConnection connection ) throws IOException
+    {
+        connection.close();
+        System.exit( 1 );
+    }
+
+
+    /**
+     * A helper method to check that we didn't get an error.
+     */
+    private static void checkldapResult( LdapConnection connection, LdapResult ldapResult ) throws IOException
+    {
+        if ( ldapResult.getResultCode() != ResultCodeEnum.SUCCESS )
+        {
+            System.out.println( "failed to bind on the server : " + ldapResult );
+            quit( connection );
+        }
+    }
+
+
+    public void init( DirectoryService directoryservice )
+    {
+        this.directoryService = directoryservice;
+    }
+
+
+    public boolean connect()
+    {
+        String providerHost = config.getProviderHost();
+        int port = config.getPort();
+
+        // Create a connection
+        connection = new LdapConnectionImpl( providerHost, port );
+
+        try
+        {
+            // Connect to the server
+            boolean connected = connection.connect();
+
+            if ( !connected )
+            {
+                LOG.warn( "Failed to connect to the syncrepl provder host {} running at port {}", providerHost, port );
+                // FIXME rmove this at the time of integration with ADS
+                //System.exit( 2 );
+            }
+            else
+            {
+                return connected;
+            }
+        }
+        catch ( Exception e )
+        {
+            LOG.error( "Failed to connect to the syncrepl provder host {} running at port {}", providerHost, port );
+            LOG.error( e.getMessage(), e );
+        }
+
+        return false;
+    }
+
+
+    public boolean bind()
+    {
+        try
+        {
+            // Do a bind
+            LdapResponse bindResponse = connection.bind( config.getBindDn(), config.getCredentials() );
+
+            // Check that it' not null and valid
+            if ( bindResponse == null )
+            {
+                LOG.error( "Failed to bind with the given bindDN and credentials", bindResponse );
+                return false;
+            }
+
+            // Now get the result
+            LdapResult ldapResult = bindResponse.getLdapResult();
+
+            if ( ldapResult.getResultCode() != ResultCodeEnum.SUCCESS )
+            {
+                LOG.warn( "Failed to bind on the server : {}", ldapResult );
+            }
+            else
+            {
+                return true;
+            }
+        }
+        catch ( Exception e )
+        {
+            LOG.error( "Failed to bind with the given bindDN and credentials", e );
+        }
+
+        return false;
+    }
+
+
+    public void prepareSyncSearchRequest()
+    {
+
+        String baseDn = config.getBaseDn();
+        
+        searchRequest = new SearchRequest();
+        try
+        {
+            searchRequest.setBaseObject( new LdapDN( baseDn ) );
+        }
+        catch ( Exception e )
+        {
+            LOG.error( "Invalid base DN {}", baseDn );
+            LOG.error( e.getMessage(), e );
+            searchRequest = null;
+            return;
+        }
+
+        try
+        {
+            ExprNode filterNode = FilterParser.parse( config.getFilter() );
+            searchRequest.setFilter( TwixTransformer.transformFilter( filterNode ) );
+        }
+        catch ( Exception e )
+        {
+            LOG.error( "Failed to parse the filter expression {}", config.getFilter() );
+            LOG.error( e.getMessage(), e );
+            searchRequest = null;
+            return;
+        }
+
+        searchRequest.setSizeLimit( config.getSearchSizeLimit() );
+        searchRequest.setTimeLimit( config.getSearchTimeout() );
+        //TODO openLdap cries if this flag is set 
+        // searchRequest.setDerefAliases( LdapConstants.DEREF_ALWAYS );
+        searchRequest.setScope( SearchScope.getSearchScope( config.getSearchScope() ) );
+        searchRequest.setTypesOnly( false );
+
+        String attributes = config.getAttributes();
+        if ( attributes == null || attributes.trim().length() == 0 )
+        {
+            searchRequest.addAttribute( SchemaConstants.ALL_USER_ATTRIBUTES );
+        }
+        else
+        {
+            String[] attrs = attributes.trim().split( "," );
+            for ( String s : attrs )
+            {
+                s = s.trim();
+                if ( s.length() > 0 )
+                {
+                    searchRequest.addAttribute( s );
+                }
+            }
+        }
+
+        syncReq = new SyncRequestValueControl();
+
+        if ( config.isRefreshPersist() )
+        {
+            syncReq.setMode( SynchronizationModeEnum.REFRESH_AND_PERSIST );
+        }
+        else
+        {
+            syncReq.setMode( SynchronizationModeEnum.REFRESH_ONLY );
+        }
+
+        syncReq.setReloadHint( false );
+
+        Control control = new Control();
+        control.setControlType( SyncRequestValueControl.CONTROL_OID );
+        control.setCriticality( syncReq.isCritical() );
+        control.setControlValue( syncReq.getEncodedValue() );
+
+        searchRequest.addControl( control );
+    }
+
+
+    /**
+     * starts the syn operation
+     * 
+     * TODO should run in a separate thread
+     * 
+     */
+    public void startSync()
+    {
+        if ( searchRequest == null )
+        {
+            return;
+        }
+
+        while ( true )
+        {
+            if ( syncCookie != null )
+            {
+                syncReq.setCookie( syncCookie );
+                searchRequest.getCurrentControl().setControlValue( syncReq.getEncodedValue() );
+            }
+            
+            try
+            {
+                System.out.println( "searching with searchRequest..." );
+                Object[] result = connection.search( searchRequest );
+                
+                SearchResultDone searchDone = ( SearchResultDone ) result[1];
+                
+                SyncDoneValueControlCodec syncDoneCtrl = ( SyncDoneValueControlCodec ) searchDone
+                .getCurrentControl().getControlValue();
+                syncCookie = syncDoneCtrl.getCookie();
+                LOG.info( "synccookie {}", StringTools.utf8ToString( syncCookie ) );
+                
+                List<SearchResultEntry> syncResList = ( List<SearchResultEntry> ) result[0];
+                
+                if ( syncResList != null )
+                {
+                    System.out.println( "sync state results..." + syncResList.size() );
+                    for ( SearchResultEntry entry : syncResList )
+                    {
+                        Entry clientEntry = entry.getEntry();
+                        SyncStateValueControlCodec syncStateCtrl = ( SyncStateValueControlCodec ) entry
+                        .getCurrentControl().getControlValue();
+                        
+                        SyncStateTypeEnum state = syncStateCtrl.getSyncStateType();
+                        
+                        System.out.println( "==================" + state.name() + ": " + clientEntry.getDn() );
+                        
+                        if ( state == SyncStateTypeEnum.ADD )
+                        {
+                            directoryService.getAdminSession().add(
+                                new DefaultServerEntry( directoryService.getRegistries(), clientEntry ) );
+                        }
+                        else if ( state == SyncStateTypeEnum.DELETE )
+                        {
+                            directoryService.getAdminSession().delete( clientEntry.getDn() );
+                        }
+                        else if ( state == SyncStateTypeEnum.MODIFY )
+                        {
+                            LOG.error( "FIXME yet to implement modification" );
+                        }
+                    }
+                }
+                
+                Thread.sleep( config.getConsumerInterval() );
+                
+                LOG.info( "--------------------- syncing again ------------------" );
+            }
+            catch ( Exception e )
+            {
+                LOG.error( "Failed to sync", e );
+                //FIXME should be removed while integrating
+                System.exit(1);
+            }
+        }// end of while loop
+    }
+
+
+    // -javaagent:~/jip/profile/profile.jar -Dprofile.properties=~/jip/profile/profile-add.properties
+    public void disconnet()
+    {
+        try
+        {
+            connection.unBind();
+            LOG.info( "Unbound from the server {}", config.getProviderHost() );
+
+            connection.close();
+            LOG.info( "Connection closed for the server {}", config.getProviderHost() );
+
+            directoryService.shutdown();
+            LOG.info( "stopped directory service" );
+        }
+        catch ( Exception e )
+        {
+            LOG.error( "Failed to close the connection", e );
+        }
+    }
+
+
+    private void startEmbeddedServer( File workDir )
+    {
+        try
+        {
+            directoryService = new DefaultDirectoryService();
+            directoryService.setShutdownHookEnabled( false );
+            directoryService.setWorkingDirectory( workDir );
+            int consumerPort = AvailablePortFinder.getNextAvailable( 1024 );
+            LdapService ldapService = new LdapService();
+            ldapService.setTcpTransport( new TcpTransport( consumerPort ) );
+            ldapService.setDirectoryService( directoryService );
+
+            LdapDN suffix = new LdapDN( config.getBaseDn() );
+            Partition partition = new JdbmPartition();
+            partition.setSuffix( suffix.getUpName() );
+            partition.setId( "syncrepl" );
+            partition.init( directoryService );
+
+            directoryService.addPartition( partition );
+
+            directoryService.startup();
+
+            ServerEntry contextEntry = new DefaultServerEntry( directoryService.getRegistries(), suffix );
+            contextEntry.add( "objectclass", "domain" );
+            contextEntry.add( "dc", "my-domain" );
+            directoryService.getSession().add( contextEntry );
+
+            ldapService.addExtendedOperationHandler( new StartTlsHandler() );
+            ldapService.addExtendedOperationHandler( new StoredProcedureExtendedOperationHandler() );
+
+            ldapService.start();
+        }
+        catch ( Exception e )
+        {
+            e.printStackTrace();
+        }
+    }
+
+
+    /**
+     * The main starting point
+     */
+    public static void main( String[] args ) throws Exception
+    {
+        final SyncreplConsumer agent = new SyncreplConsumer();
+     
+        SyncreplConfiguration config = new SyncreplConfiguration();
+        config.setProviderHost( "localhost" );
+        config.setPort( 389 );
+        config.setBindDn( "cn=Manager,dc=my-domain,dc=com" );
+        config.setCredentials( "secret" );
+        config.setBaseDn( "dc=my-domain,dc=com" );
+        config.setFilter( "(objectclass=*)" );
+
+        agent.setConfig( config );
+        
+        final File workDir = new File( System.getProperty( "java.io.tmpdir" ) + "/work" );
+
+        if( workDir.exists() )
+        {
+            FileUtils.forceDelete( workDir );
+        }
+        else
+        {
+            workDir.mkdirs();
+        }
+        
+        agent.startEmbeddedServer( workDir );
+
+        agent.connect();
+        agent.bind();
+        agent.prepareSyncSearchRequest();
+        agent.startSync();
+    }
+}



Mime
View raw message