From commits-return-21279-apmail-directory-commits-archive=directory.apache.org@directory.apache.org Thu Mar 05 20:32:33 2009 Return-Path: Delivered-To: apmail-directory-commits-archive@www.apache.org Received: (qmail 24359 invoked from network); 5 Mar 2009 20:32:32 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 5 Mar 2009 20:32:32 -0000 Received: (qmail 586 invoked by uid 500); 5 Mar 2009 20:32:32 -0000 Delivered-To: apmail-directory-commits-archive@directory.apache.org Received: (qmail 555 invoked by uid 500); 5 Mar 2009 20:32:32 -0000 Mailing-List: contact commits-help@directory.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@directory.apache.org Delivered-To: mailing list commits@directory.apache.org Received: (qmail 546 invoked by uid 99); 5 Mar 2009 20:32:32 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2009 12:32:32 -0800 X-ASF-Spam-Status: No, hits=-1998.8 required=10.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2009 20:32:24 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7271C238889E; Thu, 5 Mar 2009 20:32:03 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r750576 - in /directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl: LdapConnection.java LdapConnectionImpl.java SyncreplConsumer.java Date: Thu, 05 Mar 2009 20:32:02 -0000 To: commits@directory.apache.org From: kayyagari@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090305203203.7271C238889E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kayyagari Date: Thu Mar 5 20:32:01 2009 New Revision: 750576 URL: http://svn.apache.org/viewvc?rev=750576&view=rev Log: o implemented the new ConsumerCallback interface o added preliminary support for handling the syncinfovalue control Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnection.java directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnectionImpl.java directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConsumer.java Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnection.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnection.java?rev=750576&r1=750575&r2=750576&view=diff ============================================================================== --- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnection.java (original) +++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnection.java Thu Mar 5 20:32:01 2009 @@ -76,20 +76,16 @@ * @param baseObject The base for the search. It must be a valid * DN, and can't be emtpy * @param filter The filter to use for this search. It can't be empty - * @return An Object array of size 2 containing the result entries - * at the 0th index and the syncdone value at 1st index */ - Object[] search( String baseObject, String filter ) throws Exception; + void search( String baseObject, String filter ) throws Exception; /** * Send the already built SearchRequest to the server. * * @param searchRequest the SearchRequest object to send to the server - * @return An Object array of size 2 containing the result entries - * at the 0th index and the syncdone value at 1st index */ - Object[] search( SearchRequest searchRequest ) throws Exception; + void search( SearchRequest searchRequest ) throws Exception; /* void search( String baseObject, SearchScope scope, int derefAlias, @@ -121,4 +117,13 @@ * @return The last request response */ LdapMessage getResponse(); + + + /** + * + * adds a type of ConsumerCalllback which can handle the results of a syncrepl search request. + * + * @param consumer an instance of ConsumerCalllback implementation. + */ + void addConsumer( ConsumerCalllback consumer ); } Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnectionImpl.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnectionImpl.java?rev=750576&r1=750575&r2=750576&view=diff ============================================================================== --- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnectionImpl.java (original) +++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/LdapConnectionImpl.java Thu Mar 5 20:32:01 2009 @@ -41,6 +41,7 @@ import org.apache.directory.shared.ldap.codec.TwixTransformer; import org.apache.directory.shared.ldap.codec.bind.BindRequest; import org.apache.directory.shared.ldap.codec.bind.SimpleAuthentication; +import org.apache.directory.shared.ldap.codec.intermediate.IntermediateResponse; import org.apache.directory.shared.ldap.codec.search.Filter; import org.apache.directory.shared.ldap.codec.search.SearchRequest; import org.apache.directory.shared.ldap.codec.search.SearchResultDone; @@ -50,6 +51,7 @@ import org.apache.directory.shared.ldap.filter.ExprNode; import org.apache.directory.shared.ldap.filter.FilterParser; import org.apache.directory.shared.ldap.filter.SearchScope; +import org.apache.directory.shared.ldap.message.ExtendedResponseImpl; import org.apache.directory.shared.ldap.message.ResultCodeEnum; import org.apache.directory.shared.ldap.name.LdapDN; import org.apache.directory.shared.ldap.util.StringTools; @@ -124,7 +126,8 @@ /** An operation mutex to guarantee the operation order */ private Semaphore operationMutex; - + /** the agent which created this connection */ + private ConsumerCalllback consumer; //------------------------- The constructors --------------------------// /** @@ -474,7 +477,7 @@ /** * {@inheritDoc} */ - public Object[] search( String baseObject, String filterString ) throws Exception + public void search( String baseObject, String filterString ) throws Exception { // If the session has not been establish, or is closed, we get out immediately checkSession(); @@ -522,14 +525,14 @@ searchRequest.setTypesOnly( false ); searchRequest.addAttribute( SchemaConstants.ALL_USER_ATTRIBUTES ); - return search( searchRequest ); + search( searchRequest ); } /** * {@inheritDoc} */ - public Object[] search( SearchRequest searchRequest ) throws Exception + public void search( SearchRequest searchRequest ) throws Exception { // First check the session checkSession(); @@ -575,15 +578,24 @@ i++; // Print the response - LOG.info( "Result[" + i + "]" + response ); + System.out.println( "Result[" + i + "]" + response ); + System.out.println( "Result[" + i + "]" + response.getMessageType() + ":" + response.getMessageTypeName() ); + + if( response.getMessageType() == LdapConstants.INTERMEDIATE_RESPONSE ) + { + consumer.handleSyncInfo( response.getIntermediateResponse() ); + operationMutex.release(); // FIXME lock will be held for a long time? + return; + } if ( response.getMessageType() == LdapConstants.SEARCH_RESULT_DONE ) { SearchResultDone resDone = response.getSearchResultDone(); resDone.addControl( response.getCurrentControl() ); - operationMutex.release(); - return new Object[]{ searchResults, resDone }; + consumer.handleSearchResult( searchResults, resDone); + operationMutex.release(); // FIXME lock will be held for a long time? + return; } SearchResultEntry sre = response.getSearchResultEntry(); @@ -696,6 +708,15 @@ // Store the response into the responseQueue responseQueue.add( response ); } + + + /** + * {@inheritDoc} + */ + public void addConsumer( ConsumerCalllback consumer ) + { + this.consumer = consumer; + } } Modified: directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConsumer.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConsumer.java?rev=750576&r1=750575&r2=750576&view=diff ============================================================================== --- directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConsumer.java (original) +++ directory/apacheds/branches/apacheds-replication/mitosis/src/main/java/org/apache/directory/mitosis/syncrepl/SyncreplConsumer.java Thu Mar 5 20:32:01 2009 @@ -35,12 +35,16 @@ import org.apache.directory.server.ldap.handlers.extended.StartTlsHandler; import org.apache.directory.server.ldap.handlers.extended.StoredProcedureExtendedOperationHandler; import org.apache.directory.server.protocol.shared.transport.TcpTransport; +import org.apache.directory.shared.asn1.codec.DecoderException; import org.apache.directory.shared.ldap.codec.Control; import org.apache.directory.shared.ldap.codec.LdapResponse; import org.apache.directory.shared.ldap.codec.LdapResult; import org.apache.directory.shared.ldap.codec.TwixTransformer; import org.apache.directory.shared.ldap.codec.controls.replication.syncDoneValue.SyncDoneValueControlCodec; +import org.apache.directory.shared.ldap.codec.controls.replication.syncInfoValue.SyncInfoValueControlCodec; +import org.apache.directory.shared.ldap.codec.controls.replication.syncInfoValue.SyncInfoValueControlDecoder; import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueControlCodec; +import org.apache.directory.shared.ldap.codec.intermediate.IntermediateResponse; import org.apache.directory.shared.ldap.codec.search.SearchRequest; import org.apache.directory.shared.ldap.codec.search.SearchResultDone; import org.apache.directory.shared.ldap.codec.search.SearchResultEntry; @@ -69,7 +73,7 @@ * @author Apache Directory Project * @version $Rev$, $Date$ */ -public class SyncreplConsumer +public class SyncreplConsumer implements ConsumerCalllback { /** the syncrepl configuration */ @@ -93,6 +97,8 @@ /** a reference to the directoryservice */ private DirectoryService directoryService; + + private SyncInfoValueControlDecoder decoder = new SyncInfoValueControlDecoder(); /** * @return the config @@ -162,6 +168,7 @@ } else { + connection.addConsumer( this ); return connected; } } @@ -289,6 +296,100 @@ /** + * {@inheritDoc} + * atm does nothinng except examinig and printing the content of syncinfovalue control + */ + public void handleSyncInfo( IntermediateResponse response ) + { + try + { + LOG.info( "============ inside handleSyncInfo ======================" ); + String name = response.getResponseName(); + byte[] value = response.getResponseValue(); + + SyncInfoValueControlCodec syncInfoValue = ( SyncInfoValueControlCodec ) decoder.decode( value ); + + byte[] cookie = syncInfoValue.getCookie(); + + if( cookie != null ) + { + LOG.info( "setting the cookie: " + StringTools.utf8ToString( value ) ); + syncCookie = cookie; + } + + List uuidList = syncInfoValue.getSyncUUIDs(); + + LOG.info( "The uuid list " + uuidList );// receives a list of UUIDs of the entries what to do with them??? + LOG.info( "refreshDeletes: " + syncInfoValue.isRefreshDeletes() ); + LOG.info( "refreshDone: " + syncInfoValue.isRefreshDone() ); + } + catch ( DecoderException de ) + { + LOG.error( "Failed to handle syncinfo message" ); + de.printStackTrace(); + } + + + } + + + /** + * {@inheritDoc} + */ + public void handleSearchResult( List syncResList, SearchResultDone searchDone ) + { + try + { + SyncDoneValueControlCodec syncDoneCtrl = ( SyncDoneValueControlCodec ) searchDone + .getCurrentControl().getControlValue(); + + if( syncDoneCtrl.getCookie() != null ) + { + syncCookie = syncDoneCtrl.getCookie(); + } + { + LOG.info( "cookie in syncdone message is null" ); + } + + LOG.info( "synccookie {}", StringTools.utf8ToString( syncCookie ) ); + + if ( syncResList != null ) + { + System.out.println( "sync state results..." + syncResList.size() ); + for ( SearchResultEntry entry : syncResList ) + { + Entry clientEntry = entry.getEntry(); + SyncStateValueControlCodec syncStateCtrl = ( SyncStateValueControlCodec ) entry + .getCurrentControl().getControlValue(); + + SyncStateTypeEnum state = syncStateCtrl.getSyncStateType(); + + LOG.info( state.name() + ": " + clientEntry.getDn() ); + + if ( state == SyncStateTypeEnum.ADD ) + { + directoryService.getAdminSession().add( + new DefaultServerEntry( directoryService.getRegistries(), clientEntry ) ); + } + else if ( state == SyncStateTypeEnum.DELETE ) + { + directoryService.getAdminSession().delete( clientEntry.getDn() ); + } + else if ( state == SyncStateTypeEnum.MODIFY ) + { + LOG.error( "FIXME yet to implement modification" ); + } + } + } + } + catch( Exception e ) + { + LOG.error( e.getMessage(), e ); + } + } + + + /** * starts the syn operation * * TODO should run in a separate thread @@ -312,45 +413,7 @@ try { System.out.println( "searching with searchRequest..." ); - Object[] result = connection.search( searchRequest ); - - SearchResultDone searchDone = ( SearchResultDone ) result[1]; - - SyncDoneValueControlCodec syncDoneCtrl = ( SyncDoneValueControlCodec ) searchDone - .getCurrentControl().getControlValue(); - syncCookie = syncDoneCtrl.getCookie(); - LOG.info( "synccookie {}", StringTools.utf8ToString( syncCookie ) ); - - List syncResList = ( List ) result[0]; - - if ( syncResList != null ) - { - System.out.println( "sync state results..." + syncResList.size() ); - for ( SearchResultEntry entry : syncResList ) - { - Entry clientEntry = entry.getEntry(); - SyncStateValueControlCodec syncStateCtrl = ( SyncStateValueControlCodec ) entry - .getCurrentControl().getControlValue(); - - SyncStateTypeEnum state = syncStateCtrl.getSyncStateType(); - - System.out.println( "==================" + state.name() + ": " + clientEntry.getDn() ); - - if ( state == SyncStateTypeEnum.ADD ) - { - directoryService.getAdminSession().add( - new DefaultServerEntry( directoryService.getRegistries(), clientEntry ) ); - } - else if ( state == SyncStateTypeEnum.DELETE ) - { - directoryService.getAdminSession().delete( clientEntry.getDn() ); - } - else if ( state == SyncStateTypeEnum.MODIFY ) - { - LOG.error( "FIXME yet to implement modification" ); - } - } - } + connection.search( searchRequest ); Thread.sleep( config.getConsumerInterval() ); @@ -400,9 +463,10 @@ ldapService.setDirectoryService( directoryService ); LdapDN suffix = new LdapDN( config.getBaseDn() ); - Partition partition = new JdbmPartition(); + JdbmPartition partition = new JdbmPartition(); partition.setSuffix( suffix.getUpName() ); partition.setId( "syncrepl" ); + partition.setSyncOnWrite( true ); partition.init( directoryService ); directoryService.addPartition( partition );