directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kayyag...@apache.org
Subject svn commit: r1154763 - in /directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication: ReplicaEventLog.java ReplicaEventLogCursor.java ReplicaEventMessage.java SyncReplRequestHandler.java SyncReplSearchListener.java
Date Sun, 07 Aug 2011 20:49:18 GMT
Author: kayyagari
Date: Sun Aug  7 20:49:17 2011
New Revision: 1154763

URL: http://svn.apache.org/viewvc?rev=1154763&view=rev
Log:
o made changes to log the event first before sending to the client for guaranteed delivery
  (the older entries will be purged from log while doing content update)
o pruned the log statements

Modified:
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLogCursor.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplRequestHandler.java
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java?rev=1154763&r1=1154762&r2=1154763&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLog.java
Sun Aug  7 20:49:17 2011
@@ -403,14 +403,15 @@ public class ReplicaEventLog implements 
 
 
     /**
+     * @param consumerCsn the consumer's CSN extracted from cookie
      * @return A cursor on top of the queue
      * @throws Exception If the cursor can't be created
      */
-    public ReplicaEventLogCursor getCursor() throws Exception
+    public ReplicaEventLogCursor getCursor( String consumerCsn ) throws Exception
     {
         Queue regionQueue = ( Queue ) brokerService.getRegionBroker().getDestinationMap().get(
queue );
         
-        return new ReplicaEventLogCursor( amqSession, queue, regionQueue );
+        return new ReplicaEventLogCursor( amqSession, queue, regionQueue, consumerCsn );
     }
 
 

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLogCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLogCursor.java?rev=1154763&r1=1154762&r2=1154763&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLogCursor.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventLogCursor.java
Sun Aug  7 20:49:17 2011
@@ -26,6 +26,7 @@ import org.apache.activemq.ActiveMQSessi
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.directory.server.core.event.EventType;
 import org.apache.directory.shared.ldap.model.cursor.AbstractCursor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,18 +48,24 @@ class ReplicaEventLogCursor extends Abst
     /** The queue on top of which we will build the cursor */
     private Queue regionQueue;
 
+    /** the consumer's CSN based on which messages will be qualified for sending */
+    private String consumerCsn;
 
+    private ReplicaEventMessage qualifiedEvtMsg;
+    
     /**
      * Creates a cursor on top of the given queue
      * @param session The session
      * @param queue The queue
      * @param regionQueue ???
+     * @param consumerCsn the consumer's CSN taken from cookie
      * @throws Exception If we can't create a browser on top of the queue
      */
-    public ReplicaEventLogCursor( ActiveMQSession session, ActiveMQQueue queue, Queue regionQueue
) throws Exception
+    public ReplicaEventLogCursor( ActiveMQSession session, ActiveMQQueue queue, Queue regionQueue,
String consumerCsn ) throws Exception
     {
         browser = ( ActiveMQQueueBrowser ) session.createBrowser( queue );
         
+        this.consumerCsn = consumerCsn;        
         this.regionQueue = regionQueue;
     }
 
@@ -121,14 +128,43 @@ class ReplicaEventLogCursor extends Abst
      */
     public ReplicaEventMessage get() throws Exception
     {
+        return qualifiedEvtMsg;
+    }
+
+    
+    /**
+     * selects the current queue entry if qualified for sending to the consumer
+     * 
+     * @throws Exception
+     */
+    private void selectQualified() throws Exception
+    {
+        qualifiedEvtMsg = null;
+        
         ActiveMQObjectMessage amqObj = ( ActiveMQObjectMessage ) browser.nextElement();
         LOG.debug( "ReplicaEventMessage: {}", amqObj );
-        ReplicaEventMessage message = ( ReplicaEventMessage ) amqObj.getObject();
-        regionQueue.removeMessage( amqObj.getJMSMessageID() );
+        qualifiedEvtMsg = ( ReplicaEventMessage ) amqObj.getObject();
         
-        return message;
+        if( qualifiedEvtMsg.isEventOlderThan( consumerCsn ) )
+        {
+            if( LOG.isDebugEnabled() )
+            {
+                String evt = "MODDN"; // take this as default cause the event type for MODDN
is null
+                
+                EventType evtType = qualifiedEvtMsg.getEventType();
+                if ( evtType != null )
+                {
+                    evt = evtType.name();
+                }
+                
+                LOG.debug( "event {} for dn {} is not qualified for sending", evt, qualifiedEvtMsg.getEntry().getDn()
);
+            }
+            
+            regionQueue.removeMessage( amqObj.getJMSMessageID() );
+            qualifiedEvtMsg = null;
+        }
     }
-
+    
 
     /**
      * {@inheritDoc}
@@ -144,7 +180,17 @@ class ReplicaEventLogCursor extends Abst
      */
     public boolean next() throws Exception
     {
-        return browser.hasMoreElements();
+        while( browser.hasMoreElements() )
+        {
+            selectQualified();
+            
+            if ( qualifiedEvtMsg != null )
+            {
+                return true;
+            }
+        }
+        
+        return false;
     }
 
 

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java?rev=1154763&r1=1154762&r2=1154763&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/ReplicaEventMessage.java
Sun Aug  7 20:49:17 2011
@@ -34,6 +34,7 @@ import org.apache.directory.shared.ldap.
 import org.apache.directory.shared.ldap.extras.controls.SyncModifyDn;
 import org.apache.directory.shared.ldap.extras.controls.SyncModifyDnType;
 import org.apache.directory.shared.ldap.extras.controls.syncrepl_impl.SyncModifyDnDecorator;
+import org.apache.directory.shared.ldap.model.constants.SchemaConstants;
 import org.apache.directory.shared.ldap.model.entry.Attribute;
 import org.apache.directory.shared.ldap.model.entry.DefaultAttribute;
 import org.apache.directory.shared.ldap.model.entry.DefaultEntry;
@@ -284,6 +285,22 @@ public class ReplicaEventMessage impleme
         }
     }
 
+    
+    /**
+     * checks if the event's CSN is older than the given CSN
+     *
+     * @param csn the CSN
+     * @return true if the event's CSN is older than the given CSN
+     * @throws Exception if there are any extreme conditions like a null entry or missing
entryCSN attribute.
+     */
+    public boolean isEventOlderThan( String csn ) throws Exception
+    {
+        String entryCsn = entry.get( SchemaConstants.ENTRY_CSN_AT ).getString();
+        
+        int i = entryCsn.compareTo( csn );
+        
+        return ( i < 0 );
+    }
 
     /**
      * Set the SchemaManager 

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplRequestHandler.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplRequestHandler.java?rev=1154763&r1=1154762&r2=1154763&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplRequestHandler.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplRequestHandler.java
Sun Aug  7 20:49:17 2011
@@ -291,7 +291,8 @@ public class SyncReplRequestHandler impl
                     }
                     else
                     {
-                        doContentUpdate( session, request, clientMsgLog );
+                        String consumerCsn = getCsn( cookieString );
+                        doContentUpdate( session, request, clientMsgLog, consumerCsn );
                     }
                 }
             }
@@ -308,13 +309,13 @@ public class SyncReplRequestHandler impl
     }
 
 
-    private String sendContentFromLog( LdapSession session, SearchRequest req, ReplicaEventLog
clientMsgLog )
+    private String sendContentFromLog( LdapSession session, SearchRequest req, ReplicaEventLog
clientMsgLog, String consumerCsn )
         throws Exception
     {
         // do the search from the log
         String lastSentCsn = clientMsgLog.getLastSentCsn();
 
-        ReplicaEventLogCursor cursor = clientMsgLog.getCursor();
+        ReplicaEventLogCursor cursor = clientMsgLog.getCursor( consumerCsn );
         
         while ( cursor.next() )
         {
@@ -357,7 +358,7 @@ public class SyncReplRequestHandler impl
     }
 
 
-    private void doContentUpdate( LdapSession session, SearchRequest req, ReplicaEventLog
replicaLog )
+    private void doContentUpdate( LdapSession session, SearchRequest req, ReplicaEventLog
replicaLog, String consumerCsn )
         throws Exception
     {
         boolean refreshNPersist = isRefreshNPersist( req );
@@ -372,7 +373,7 @@ public class SyncReplRequestHandler impl
             handler.setSession( session );
         }
 
-        String lastSentCsn = sendContentFromLog( session, req, replicaLog );
+        String lastSentCsn = sendContentFromLog( session, req, replicaLog, consumerCsn );
 
         byte[] cookie = Strings.getBytesUtf8(replicaLog.getId() + REPLICA_ID_DELIM + lastSentCsn);
 
@@ -477,7 +478,7 @@ public class SyncReplRequestHandler impl
 
             if ( refreshNPersist ) // refreshAndPersist mode
             {
-                contextCsn = sendContentFromLog( session, request, replicaLog );
+                contextCsn = sendContentFromLog( session, request, replicaLog, contextCsn
);
                 cookie = Strings.getBytesUtf8(replicaLog.getId() + REPLICA_ID_DELIM + contextCsn);
 
                 IntermediateResponse intermResp = new IntermediateResponseImpl( request.getMessageId()
);
@@ -1012,7 +1013,25 @@ public class SyncReplRequestHandler impl
         return Csn.isValid(csnString);
     }
 
+    /**
+     * returns the CSN present in cookie
+     * 
+     * @param cookieString the cookie
+     * @return
+     */
+    private String getCsn( String cookieString )
+    {
+        int pos = cookieString.indexOf( REPLICA_ID_DELIM );
+        return cookieString.substring( pos + 1 );
+    }
 
+    
+    /**
+     * returns the replica id present in cookie
+     * 
+     * @param cookieString  the cookie
+     * @return
+     */
     private int getReplicaId( String cookieString )
     {
         String replicaId = cookieString.substring( 0, cookieString.indexOf( REPLICA_ID_DELIM
) );

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java?rev=1154763&r1=1154762&r2=1154763&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplSearchListener.java
Sun Aug  7 20:49:17 2011
@@ -55,6 +55,8 @@ import org.slf4j.LoggerFactory;
  * A listener associated with the replication system. It does send the modifications to the

  * consumer, if it's connected, or store the data into a queue for a later transmission.
  * 
+ * Note: we always log the entry irrespective of the client's connection status for guaranteed
delivery
+ * 
  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
  */
 public class SyncReplSearchListener implements DirectoryListener, AbandonListener
@@ -183,6 +185,7 @@ public class SyncReplSearchListener impl
     {
         searchResultEntry.addControl( syncStateValue );
 
+        LOG.debug( "sending event {} of entry {}", eventType, entry.getDn() );
         WriteFuture future = session.getIoSession().write( searchResultEntry );
 
         // Now, send the entry to the consumer
@@ -200,10 +203,11 @@ public class SyncReplSearchListener impl
     {
         Entry entry = addContext.getEntry();
 
-        LOG.debug( "sending added entry {}", entry.getDn() );
-
         try
         {
+            // we log it first
+            consumerMsgLog.log( new ReplicaEventMessage( EventType.ADD, entry ) );
+
             // We send the added entry directly to the consumer if it's connected
             if ( pushInRealTime )
             {
@@ -217,11 +221,7 @@ public class SyncReplSearchListener impl
                 
                 sendResult( resultEntry, entry, EventType.ADD, syncAdd, null );
             }
-            else
-            {
-                // We are not connected, store the entry into the consumer's queue
-                consumerMsgLog.log( new ReplicaEventMessage( EventType.ADD, addContext.getEntry()
) );
-            }
+            
         }
         catch ( LdapInvalidAttributeValueException e )
         {
@@ -249,10 +249,10 @@ public class SyncReplSearchListener impl
      */
     private void sendDeletedEntry( Entry entry )
     {
-        LOG.debug( "sending deleted entry {}", entry.getDn() );
-
         try
         {
+            consumerMsgLog.log( new ReplicaEventMessage( EventType.DELETE, entry ) );
+            
             if ( pushInRealTime )
             {
                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId()
);
@@ -263,10 +263,6 @@ public class SyncReplSearchListener impl
 
                 sendResult( resultEntry, entry, EventType.DELETE, syncDelete, null );
             }
-            else
-            {
-                consumerMsgLog.log( new ReplicaEventMessage( EventType.DELETE, entry ) );
-            }
         }
         catch ( LdapInvalidAttributeValueException e )
         {
@@ -286,10 +282,10 @@ public class SyncReplSearchListener impl
     {
         Entry alteredEntry = modifyContext.getAlteredEntry();
 
-        LOG.debug( "sending modified entry {}", alteredEntry.getDn() );
-
         try
         {
+            consumerMsgLog.log( new ReplicaEventMessage( EventType.MODIFY, alteredEntry )
);
+            
             if ( pushInRealTime )
             {
 
@@ -301,10 +297,6 @@ public class SyncReplSearchListener impl
 
                 sendResult( resultEntry, alteredEntry, EventType.MODIFY, syncModify, null
);
             }
-            else
-            {
-                consumerMsgLog.log( new ReplicaEventMessage( EventType.MODIFY, modifyContext.getAlteredEntry()
) );
-            }
         }
         catch ( Exception e )
         {
@@ -323,8 +315,6 @@ public class SyncReplSearchListener impl
     {
         Entry entry = moveContext.getOriginalEntry();
 
-        LOG.debug( "sending moved entry {}", entry.getDn() );
-
         try
         {
             if ( !moveContext.getNewSuperior().isDescendantOf( consumerMsgLog.getSearchCriteria().getBase()
) )
@@ -338,6 +328,8 @@ public class SyncReplSearchListener impl
             modDnControl.setEntryDn( moveContext.getDn().getNormName() );
             modDnControl.setNewSuperiorDn( moveContext.getNewSuperior().getNormName() );
 
+            consumerMsgLog.log( new ReplicaEventMessage( modDnControl, entry ) );
+            
             if ( pushInRealTime )
             {
                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId()
);
@@ -349,10 +341,6 @@ public class SyncReplSearchListener impl
 
                 sendResult( resultEntry, entry, null, syncModify, modDnControl );
             }
-            else
-            {
-                consumerMsgLog.log( new ReplicaEventMessage( modDnControl, entry ) );
-            }
         }
         catch ( Exception e )
         {
@@ -369,9 +357,6 @@ public class SyncReplSearchListener impl
      */
     public void entryMovedAndRenamed( MoveAndRenameOperationContext moveAndRenameContext
)
     {
-
-        LOG.debug( "sending moveAndRenamed entry {}", moveAndRenameContext.getDn() );
-
         try
         {
             if ( !moveAndRenameContext.getNewSuperiorDn().isDescendantOf( consumerMsgLog.getSearchCriteria().getBase()
) )
@@ -387,22 +372,21 @@ public class SyncReplSearchListener impl
             modDnControl.setNewRdn( moveAndRenameContext.getNewRdn().getNormName() );
             modDnControl.setDeleteOldRdn( moveAndRenameContext.getDeleteOldRdn() );
 
+            // should always send the original entry cause the consumer perform the modDn
operation there
+            Entry entry = moveAndRenameContext.getOriginalEntry();
+
+            consumerMsgLog.log( new ReplicaEventMessage( modDnControl, entry ) );
+            
             if ( pushInRealTime )
             {
-                Entry alteredEntry = moveAndRenameContext.getModifiedEntry();
-
                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId()
);
-                resultEntry.setObjectName( moveAndRenameContext.getModifiedEntry().getDn()
);
-                resultEntry.setEntry( alteredEntry );
+                resultEntry.setObjectName( entry.getDn() );
+                resultEntry.setEntry( entry );
                 resultEntry.addControl( modDnControl );
 
-                SyncStateValue syncModify = createControl( directoryService, SyncStateTypeEnum.MODDN,
alteredEntry );
+                SyncStateValue syncModify = createControl( directoryService, SyncStateTypeEnum.MODDN,
entry );
 
-                sendResult( resultEntry, alteredEntry, null, syncModify, modDnControl );
-            }
-            else
-            {
-                consumerMsgLog.log( new ReplicaEventMessage( modDnControl, moveAndRenameContext.getEntry()
) );
+                sendResult( resultEntry, entry, null, syncModify, modDnControl );
             }
         }
         catch ( Exception e )
@@ -420,9 +404,7 @@ public class SyncReplSearchListener impl
      */
     public void entryRenamed( RenameOperationContext renameContext )
     {
-        Entry entry = renameContext.getEntry();
-
-        LOG.debug( "sending renamed entry {}", entry.getDn() );
+        Entry entry = renameContext.getOriginalEntry();
 
         try
         {
@@ -432,6 +414,9 @@ public class SyncReplSearchListener impl
             modDnControl.setNewRdn( renameContext.getNewRdn().getName() );
             modDnControl.setDeleteOldRdn( renameContext.getDeleteOldRdn() );
 
+            // should always send the original entry cause the consumer perform the modDn
operation there
+            consumerMsgLog.log( new ReplicaEventMessage( modDnControl, entry ) );
+            
             if ( pushInRealTime )
             {
                 SearchResultEntry resultEntry = new SearchResultEntryImpl( searchRequest.getMessageId()
);
@@ -441,16 +426,10 @@ public class SyncReplSearchListener impl
 
                 SyncStateValue syncModify = createControl( directoryService, SyncStateTypeEnum.MODDN,
entry );
                 
-                Entry modifiedEntry = renameContext.getModifiedEntry();
-                
                 // In this case, the cookie is different
-                syncModify.setCookie( getCookie( modifiedEntry ) );
+                syncModify.setCookie( getCookie( entry ) );
 
-                sendResult( resultEntry, modifiedEntry, null, syncModify, modDnControl );
-            }
-            else
-            {
-                consumerMsgLog.log( new ReplicaEventMessage( modDnControl, renameContext.getEntry()
) );
+                sendResult( resultEntry, entry, null, syncModify, modDnControl );
             }
         }
         catch ( Exception e )
@@ -508,15 +487,6 @@ public class SyncReplSearchListener impl
             // set realtime push to false, will be set back to true when the client
             // comes back and sends another request this flag will be set to true
             pushInRealTime = false;
-
-            if ( modDnControl != null )
-            {
-                consumerMsgLog.log( new ReplicaEventMessage( modDnControl, entry ) );
-            }
-            else
-            {
-                consumerMsgLog.log( new ReplicaEventMessage( event, entry ) );
-            }
         }
     }
     



Mime
View raw message