Return-Path: X-Original-To: apmail-directory-commits-archive@www.apache.org Delivered-To: apmail-directory-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D51BD61DC for ; Tue, 2 Aug 2011 22:00:55 +0000 (UTC) Received: (qmail 64778 invoked by uid 500); 2 Aug 2011 22:00:55 -0000 Delivered-To: apmail-directory-commits-archive@directory.apache.org Received: (qmail 64713 invoked by uid 500); 2 Aug 2011 22:00:55 -0000 Mailing-List: contact commits-help@directory.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@directory.apache.org Delivered-To: mailing list commits@directory.apache.org Received: (qmail 64706 invoked by uid 99); 2 Aug 2011 22:00:55 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Aug 2011 22:00:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Aug 2011 22:00:51 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DFB0C23889ED for ; Tue, 2 Aug 2011 22:00:29 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1153286 - /directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java Date: Tue, 02 Aug 2011 22:00:29 -0000 To: commits@directory.apache.org From: elecharny@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110802220029.DFB0C23889ED@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: elecharny Date: Tue Aug 2 22:00:28 2011 New Revision: 1153286 URL: http://svn.apache.org/viewvc?rev=1153286&view=rev Log: o Added the missing Javadoc o Minor refactoring Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java?rev=1153286&r1=1153285&r2=1153286&view=diff ============================================================================== --- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java (original) +++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java Tue Aug 2 22:00:28 2011 @@ -52,8 +52,8 @@ import org.slf4j.LoggerFactory; /** - * modeled after PersistentSearchListener - * NOTE: doco is missing at many parts. Will be added once the functionality is satisfactory + * A listener associated with the replication system. It does send the modifications to the + * consumer, if it's connected, or store the data into a queue for a later transmission. * * @author Apache Directory Project */ @@ -158,6 +158,9 @@ public class SyncReplSearchListener impl } + /** + * Create the SyncStateValue control + */ private SyncStateValue createControl( DirectoryService directoryService, SyncStateTypeEnum operation, Entry entry ) throws LdapInvalidAttributeValueException { @@ -172,6 +175,9 @@ public class SyncReplSearchListener impl } + /** + * Send the result to the consumer. If the consumer has disconnected, we fail back to the queue. + */ private void sendResult( SearchResultEntry searchResultEntry, Entry entry, EventType eventType, SyncStateValue syncStateValue, SyncModifyDn syncModifyDn ) { @@ -179,6 +185,7 @@ public class SyncReplSearchListener impl WriteFuture future = session.getIoSession().write( searchResultEntry ); + // Now, send the entry to the consumer handleWriteFuture( future, entry, eventType, syncModifyDn ); } @@ -201,14 +208,14 @@ public class SyncReplSearchListener impl if ( pushInRealTime ) { // Construct a new SearchResultEntry - SearchResultEntry respEntry = new SearchResultEntryImpl( searchRequest.getMessageId() ); - respEntry.setObjectName( entry.getDn() ); - respEntry.setEntry( entry ); + SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() ); + resultEntry.setObjectName( entry.getDn() ); + resultEntry.setEntry( entry ); // Create the control which will be added to the response. SyncStateValue syncAdd = createControl( directoryService, SyncStateTypeEnum.ADD, entry ); - sendResult( respEntry, entry, EventType.ADD, syncAdd, null ); + sendResult( resultEntry, entry, EventType.ADD, syncAdd, null ); } else { @@ -224,13 +231,22 @@ public class SyncReplSearchListener impl } + /** + * Process a Delete operation. A delete event is send to the consumer, or stored in its + * queue if the consumer is not connected. + * + * @param deleteContext The delete operation context + */ public void entryDeleted( DeleteOperationContext deleteContext ) { Entry entry = deleteContext.getEntry(); sendDeletedEntry( entry ); } - + + /** + * A helper method, as the delete opertaionis used by the ModDN operations. + */ private void sendDeletedEntry( Entry entry ) { LOG.debug( "sending deleted entry {}", entry.getDn() ); @@ -239,13 +255,13 @@ public class SyncReplSearchListener impl { if ( pushInRealTime ) { - SearchResultEntry respEntry = new SearchResultEntryImpl( searchRequest.getMessageId() ); - respEntry.setObjectName( entry.getDn() ); - respEntry.setEntry( entry ); + SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() ); + resultEntry.setObjectName( entry.getDn() ); + resultEntry.setEntry( entry ); SyncStateValue syncDelete = createControl( directoryService, SyncStateTypeEnum.DELETE, entry ); - sendResult( respEntry, entry, EventType.DELETE, syncDelete, null ); + sendResult( resultEntry, entry, EventType.DELETE, syncDelete, null ); } else { @@ -260,6 +276,12 @@ public class SyncReplSearchListener impl } + /** + * Process a Modify operation. A modify event is send to the consumer, or stored in its + * queue if the consumer is not connected. + * + * @param modifyContext The modify operation context + */ public void entryModified( ModifyOperationContext modifyContext ) { Entry alteredEntry = modifyContext.getAlteredEntry(); @@ -271,13 +293,13 @@ public class SyncReplSearchListener impl if ( pushInRealTime ) { - SearchResultEntry respEntry = new SearchResultEntryImpl( searchRequest.getMessageId() ); - respEntry.setObjectName( modifyContext.getDn() ); - respEntry.setEntry( alteredEntry ); + SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() ); + resultEntry.setObjectName( modifyContext.getDn() ); + resultEntry.setEntry( alteredEntry ); SyncStateValue syncModify = createControl( directoryService, SyncStateTypeEnum.MODIFY, alteredEntry ); - sendResult( respEntry, alteredEntry, EventType.MODIFY, syncModify, null ); + sendResult( resultEntry, alteredEntry, EventType.MODIFY, syncModify, null ); } else { @@ -291,6 +313,12 @@ public class SyncReplSearchListener impl } + /** + * Process a Move operation. A MODDN event is send to the consumer, or stored in its + * queue if the consumer is not connected. + * + * @param moveContext The move operation context + */ public void entryMoved( MoveOperationContext moveContext ) { Entry entry = moveContext.getOriginalEntry(); @@ -312,14 +340,14 @@ public class SyncReplSearchListener impl if ( pushInRealTime ) { - SearchResultEntry respEntry = new SearchResultEntryImpl( searchRequest.getMessageId() ); - respEntry.setObjectName( moveContext.getDn() ); - respEntry.setEntry( entry ); - respEntry.addControl( modDnControl ); + SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() ); + resultEntry.setObjectName( moveContext.getDn() ); + resultEntry.setEntry( entry ); + resultEntry.addControl( modDnControl ); SyncStateValue syncModify = createControl( directoryService, SyncStateTypeEnum.MODDN, entry ); - sendResult( respEntry, entry, null, syncModify, modDnControl ); + sendResult( resultEntry, entry, null, syncModify, modDnControl ); } else { @@ -333,6 +361,12 @@ public class SyncReplSearchListener impl } + /** + * Process a MoveAndRename operation. A MODDN event is send to the consumer, or stored in its + * queue if the consumer is not connected. + * + * @param moveAndRenameContext The move and rename operation context + */ public void entryMovedAndRenamed( MoveAndRenameOperationContext moveAndRenameContext ) { @@ -357,14 +391,14 @@ public class SyncReplSearchListener impl { Entry alteredEntry = moveAndRenameContext.getModifiedEntry(); - SearchResultEntry respEntry = new SearchResultEntryImpl( searchRequest.getMessageId() ); - respEntry.setObjectName( moveAndRenameContext.getModifiedEntry().getDn() ); - respEntry.setEntry( alteredEntry ); - respEntry.addControl( modDnControl ); + SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() ); + resultEntry.setObjectName( moveAndRenameContext.getModifiedEntry().getDn() ); + resultEntry.setEntry( alteredEntry ); + resultEntry.addControl( modDnControl ); SyncStateValue syncModify = createControl( directoryService, SyncStateTypeEnum.MODDN, alteredEntry ); - sendResult( respEntry, alteredEntry, null, syncModify, modDnControl ); + sendResult( resultEntry, alteredEntry, null, syncModify, modDnControl ); } else { @@ -378,6 +412,12 @@ public class SyncReplSearchListener impl } + /** + * Process a Rename operation. A MODDN event is send to the consumer, or stored in its + * queue if the consumer is not connected. + * + * @param renameContext The rename operation context + */ public void entryRenamed( RenameOperationContext renameContext ) { Entry entry = renameContext.getEntry(); @@ -394,10 +434,10 @@ public class SyncReplSearchListener impl if ( pushInRealTime ) { - SearchResultEntry respEntry = new SearchResultEntryImpl( searchRequest.getMessageId() ); - respEntry.setObjectName( entry.getDn() ); - respEntry.setEntry( entry ); - respEntry.addControl( modDnControl ); + SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId() ); + resultEntry.setObjectName( entry.getDn() ); + resultEntry.setEntry( entry ); + resultEntry.addControl( modDnControl ); SyncStateValue syncModify = createControl( directoryService, SyncStateTypeEnum.MODDN, entry ); @@ -406,7 +446,7 @@ public class SyncReplSearchListener impl // In this case, the cookie is different syncModify.setCookie( getCookie( modifiedEntry ) ); - sendResult( respEntry, modifiedEntry, null, syncModify, modDnControl ); + sendResult( resultEntry, modifiedEntry, null, syncModify, modDnControl ); } else { @@ -420,28 +460,44 @@ public class SyncReplSearchListener impl } + /** + * @return true if the entries are sent to the consumer in real time + */ public boolean isPushInRealTime() { return pushInRealTime; } + /** + * Set the pushInRealTime parameter + * @param pushInRealTime true if the entries must be push to the consumer directly + */ public void setPushInRealTime( boolean pushInRealTime ) { this.pushInRealTime = pushInRealTime; } + /** + * Get the cookie from the entry + */ private byte[] getCookie( Entry entry ) throws LdapInvalidAttributeValueException { String csn = entry.get( SchemaConstants.ENTRY_CSN_AT ).getString(); + return Strings.getBytesUtf8( consumerMsgLog.getId() + SyncReplRequestHandler.REPLICA_ID_DELIM + csn ); } + /** + * Process the writing of the replicated entry to the consumer + */ private void handleWriteFuture( WriteFuture future, Entry entry, EventType event, SyncModifyDn modDnControl ) { - future.awaitUninterruptibly(); + // Let the operation be executed. + // Note : we wait 10 seconds max + future.awaitUninterruptibly( 10000L ); if ( !future.isWritten() ) {