directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From elecha...@apache.org
Subject svn commit: r1432912 - /directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java
Date Mon, 14 Jan 2013 13:21:22 GMT
Author: elecharny
Date: Mon Jan 14 13:21:22 2013
New Revision: 1432912

URL: http://svn.apache.org/viewvc?rev=1432912&view=rev
Log:
A few ranmes, plus formating

Modified:
    directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java

Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java
URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java?rev=1432912&r1=1432911&r2=1432912&view=diff
==============================================================================
--- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java
(original)
+++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java
Mon Jan 14 13:21:22 2013
@@ -98,6 +98,7 @@ import org.apache.directory.server.ldap.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Class used to process the incoming synchronization request from the consumers.
  *
@@ -123,10 +124,10 @@ public class SyncReplRequestHandler impl
 
     /** An ObjectClass AT instance */
     private static AttributeType OBJECT_CLASS_AT;
-    
+
     /** The CSN AttributeType instance */
     private AttributeType CSN_AT;
-    
+
     private Map<Integer, ReplicaEventLog> replicaLogMap = new HashMap<Integer, ReplicaEventLog>();
 
     private File syncReplData;
@@ -136,9 +137,10 @@ public class SyncReplRequestHandler impl
     private ReplConsumerManager replicaUtil;
 
     private ConsumerLogEntryDeleteListener cledListener;
-    
+
     private ReplicaEventLogJanitor logJanitor;
-    
+
+
     /**
      * Create a SyncReplRequestHandler empty instance 
      */
@@ -157,7 +159,7 @@ public class SyncReplRequestHandler impl
         {
             LOG.warn( "syncrepl provider was already initialized" );
             PROVIDER_LOG.warn( "syncrepl provider was already initialized" );
-            
+
             return;
         }
 
@@ -168,10 +170,10 @@ public class SyncReplRequestHandler impl
 
             this.ldapServer = server;
             this.dirService = server.getDirectoryService();
-            
+
             CSN_AT = dirService.getSchemaManager()
                 .lookupAttributeTypeRegistry( SchemaConstants.ENTRY_CSN_AT );
-            
+
             OBJECT_CLASS_AT = dirService.getSchemaManager()
                 .lookupAttributeTypeRegistry( SchemaConstants.OBJECT_CLASS_AT );
 
@@ -193,16 +195,16 @@ public class SyncReplRequestHandler impl
 
             logJanitor = new ReplicaEventLogJanitor( replicaLogMap );
             logJanitor.start();
-            
+
             registerPersistentSearches();
 
             cledListener = new ConsumerLogEntryDeleteListener();
             NotificationCriteria criteria = new NotificationCriteria();
             criteria.setBase( new Dn( dirService.getSchemaManager(), ServerDNConstants.REPL_CONSUMER_DN_STR
) );
             criteria.setEventMask( EventType.DELETE );
-            
+
             dirService.getEventService().addListener( cledListener, criteria );
-            
+
             Thread consumerInfoUpdateThread = new Thread( createConsumerInfoUpdateTask()
);
             consumerInfoUpdateThread.setDaemon( true );
             consumerInfoUpdateThread.start();
@@ -226,9 +228,9 @@ public class SyncReplRequestHandler impl
     public void stop()
     {
         logJanitor.stopCleaning();
-        
+
         EventService evtSrv = dirService.getEventService();
-        
+
         for ( ReplicaEventLog log : replicaLogMap.values() )
         {
             try
@@ -243,9 +245,9 @@ public class SyncReplRequestHandler impl
                 PROVIDER_LOG.error( "Failed to close the event log {}", log.getId(), e );
             }
         }
-        
+
         evtSrv.removeListener( cledListener );
-        
+
         initialized = false;
     }
 
@@ -260,12 +262,12 @@ public class SyncReplRequestHandler impl
     {
         try
         {
-            if( !request.getAttributes().contains( SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES
) )
+            if ( !request.getAttributes().contains( SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES
) )
             {
                 // this is needed for accessing entryUUID and entryCSN attributes for internal
purpose
                 request.addAttributes( SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES );
             }
-            
+
             // First extract the Sync control from the request
             SyncRequestValue syncControl = ( SyncRequestValue ) request.getControls().get(
                 SyncRequestValue.OID );
@@ -331,14 +333,15 @@ public class SyncReplRequestHandler impl
     /**
      * Send all the stored modifications to the consumer
      */
-    private void sendContentFromLog( LdapSession session, SearchRequest req, ReplicaEventLog
clientMsgLog, String fromCsn )
+    private void sendContentFromLog( LdapSession session, SearchRequest req, ReplicaEventLog
clientMsgLog,
+        String fromCsn )
         throws Exception
     {
         // do the search from the log
         String lastSentCsn = fromCsn;
 
         ReplicaJournalCursor cursor = clientMsgLog.getCursor( fromCsn );
-        
+
         PROVIDER_LOG.debug( "Processing the log for replica {}", clientMsgLog.getId() );
 
         try
@@ -349,38 +352,39 @@ public class SyncReplRequestHandler impl
                 Entry entry = replicaEventMessage.getEntry();
                 LOG.debug( "Read message from the queue {}", entry );
                 PROVIDER_LOG.debug( "Read message from the queue {}", entry );
-                
+
                 lastSentCsn = entry.get( CSN_AT ).getString();
-                
+
                 ChangeType event = replicaEventMessage.getChangeType();
-                
+
                 SyncStateTypeEnum syncStateType = null;
-                
+
                 switch ( event )
                 {
-                    case ADD :
+                    case ADD:
                         syncStateType = SyncStateTypeEnum.ADD;
                         break;
-                        
-                    case MODIFY :
+
+                    case MODIFY:
                         syncStateType = SyncStateTypeEnum.MODIFY;
                         break;
-                        
-                    case MODDN :
+
+                    case MODDN:
                         syncStateType = SyncStateTypeEnum.MODDN;
-                        
-                    case DELETE :
+
+                    case DELETE:
                         syncStateType = SyncStateTypeEnum.DELETE;
                         break;
                 }
-                
+
                 sendSearchResultEntry( session, req, entry, syncStateType );
-                
+
                 clientMsgLog.setLastSentCsn( lastSentCsn );
-                
-                PROVIDER_LOG.debug( "The latest entry sent to the consumer {} has this CSN
: {}", clientMsgLog.getId(), lastSentCsn );
+
+                PROVIDER_LOG.debug( "The latest entry sent to the consumer {} has this CSN
: {}", clientMsgLog.getId(),
+                    lastSentCsn );
             }
-            
+
             PROVIDER_LOG.debug( "All pending modifciations for replica {} processed", clientMsgLog.getId()
);
         }
         finally
@@ -400,7 +404,7 @@ public class SyncReplRequestHandler impl
         synchronized ( replicaLog )
         {
             boolean refreshNPersist = isRefreshNPersist( req );
-            
+
             // if this method is called with refreshAndPersist  
             // means the client was offline after it initiated a persistent synch session
             // we need to update the handler's session 
@@ -410,40 +414,42 @@ public class SyncReplRequestHandler impl
                 handler.setSearchRequest( req );
                 handler.setSession( session );
             }
-            
+
             sendContentFromLog( session, req, replicaLog, consumerCsn );
-            
+
             String lastSentCsn = replicaLog.getLastSentCsn();
-            
+
             byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), lastSentCsn
);
-            
+
             if ( refreshNPersist )
             {
                 IntermediateResponse intermResp = new IntermediateResponseImpl( req.getMessageId()
);
                 intermResp.setResponseName( SyncInfoValue.OID );
-                
+
                 SyncInfoValue syncInfo = new SyncInfoValueDecorator( ldapServer.getDirectoryService()
                     .getLdapCodecService(),
                     SynchronizationInfoEnum.NEW_COOKIE );
                 syncInfo.setCookie( cookie );
-                intermResp.setResponseValue( ((SyncInfoValueDecorator)syncInfo).getValue()
);
-                
-                PROVIDER_LOG.debug( "Sent the intermediate response to the {} consumer, {}",
replicaLog.getId(), intermResp );
+                intermResp.setResponseValue( ( ( SyncInfoValueDecorator ) syncInfo ).getValue()
);
+
+                PROVIDER_LOG.debug( "Sent the intermediate response to the {} consumer, {}",
replicaLog.getId(),
+                    intermResp );
                 session.getIoSession().write( intermResp );
-                
+
                 replicaLog.getPersistentListener().setPushInRealTime( refreshNPersist );
             }
             else
             {
-                SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
+                SearchResultDone searchDoneResp = req.getResultResponse();
                 searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.SUCCESS );
-                SyncDoneValue syncDone = new SyncDoneValueDecorator( 
+                SyncDoneValue syncDone = new SyncDoneValueDecorator(
                     ldapServer.getDirectoryService().getLdapCodecService() );
                 syncDone.setCookie( cookie );
                 searchDoneResp.addControl( syncDone );
-                
-                PROVIDER_LOG.debug( "Send a SearchResultDone response to the {} consumer",
replicaLog.getId(), searchDoneResp );
-                
+
+                PROVIDER_LOG.debug( "Send a SearchResultDone response to the {} consumer",
replicaLog.getId(),
+                    searchDoneResp );
+
                 session.getIoSession().write( searchDoneResp );
             }
         }
@@ -484,8 +490,8 @@ public class SyncReplRequestHandler impl
         // irrespective of the sync mode set the 'isRealtimePush' to false initially so that
we can
         // store the modifications in the queue and later if it is a persist mode
         // we push this queue's content and switch to realtime mode
-        SyncReplSearchListener handler = new SyncReplSearchListener( session, request, replicaLog,
false );
-        replicaLog.setPersistentListener( handler );
+        SyncReplSearchListener replicationListener = new SyncReplSearchListener( session,
request, replicaLog, false );
+        replicaLog.setPersistentListener( replicationListener );
 
         // compose notification criteria and add the listener to the event 
         // service using that notification criteria to determine which events 
@@ -499,7 +505,7 @@ public class SyncReplRequestHandler impl
 
         replicaLog.setSearchCriteria( criteria );
 
-        dirService.getEventService().addListener( handler, criteria );
+        dirService.getEventService().addListener( replicationListener, criteria );
 
         // then start pushing initial content
         LessEqNode csnNode = new LessEqNode( CSN_AT, contexCsnValue );
@@ -513,32 +519,32 @@ public class SyncReplRequestHandler impl
 
         if ( searchDoneResp.getLdapResult().getResultCode() == ResultCodeEnum.SUCCESS )
         {
-            if( replicaLog.getLastSentCsn() == null )
+            if ( replicaLog.getLastSentCsn() == null )
             {
                 replicaLog.setLastSentCsn( contextCsn );
             }
-            
+
             byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), contextCsn
);
 
             if ( refreshNPersist ) // refreshAndPersist mode
             {
                 sendContentFromLog( session, request, replicaLog, contextCsn );
-                cookie = LdapProtocolUtils.createCookie(replicaLog.getId(), replicaLog.getLastSentCsn());
+                cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), replicaLog.getLastSentCsn()
);
 
                 IntermediateResponse intermResp = new IntermediateResponseImpl( request.getMessageId()
);
                 intermResp.setResponseName( SyncInfoValue.OID );
 
-                SyncInfoValue syncInfo = new SyncInfoValueDecorator( 
+                SyncInfoValue syncInfo = new SyncInfoValueDecorator(
                     ldapServer.getDirectoryService().getLdapCodecService(), SynchronizationInfoEnum.NEW_COOKIE
);
                 syncInfo.setCookie( cookie );
-                intermResp.setResponseValue( ((SyncInfoValueDecorator)syncInfo).getValue()
);
+                intermResp.setResponseValue( ( ( SyncInfoValueDecorator ) syncInfo ).getValue()
);
 
                 PROVIDER_LOG.info( "Sending the intermediate response to consumer {}, {}",
replicaLog, syncInfo );
 
                 session.getIoSession().write( intermResp );
 
                 // switch the handler mode to realtime push
-                handler.setPushInRealTime( refreshNPersist );
+                replicationListener.setPushInRealTime( refreshNPersist );
             }
             else
             {
@@ -547,7 +553,8 @@ public class SyncReplRequestHandler impl
                     ldapServer.getDirectoryService().getLdapCodecService() );
                 syncDone.setCookie( cookie );
                 searchDoneResp.addControl( syncDone );
-                PROVIDER_LOG.info( "Sending the searchResultDone response to consumer {},
{}", replicaLog, searchDoneResp );
+                PROVIDER_LOG.info( "Sending the searchResultDone response to consumer {},
{}", replicaLog,
+                    searchDoneResp );
 
                 session.getIoSession().write( searchDoneResp );
             }
@@ -563,13 +570,13 @@ public class SyncReplRequestHandler impl
             replicaLog = null;
 
             // remove the listener
-            dirService.getEventService().removeListener( handler );
+            dirService.getEventService().removeListener( replicationListener );
 
             return;
         }
 
         // if all is well then store the consumer information
-        replicaUtil.addConsumerEntry(replicaLog );
+        replicaUtil.addConsumerEntry( replicaLog );
 
         // add to the map only after storing in the DIT, else the Replica update thread barfs
         replicaLogMap.put( replicaLog.getId(), replicaLog );
@@ -580,9 +587,10 @@ public class SyncReplRequestHandler impl
      * Process a search on the provider to get all the modified entries. We then send all
      * of them to the consumer
      */
-    private SearchResultDone doSimpleSearch( LdapSession session, SearchRequest req, ReplicaEventLog
replicaLog ) throws Exception
+    private SearchResultDone doSimpleSearch( LdapSession session, SearchRequest req, ReplicaEventLog
replicaLog )
+        throws Exception
     {
-        SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
+        SearchResultDone searchDoneResp = req.getResultResponse();
         LdapResult ldapResult = searchDoneResp.getLdapResult();
 
         // A normal search
@@ -639,7 +647,6 @@ public class SyncReplRequestHandler impl
     {
         long count = 0;
 
-        
         while ( ( count < sizeLimit ) && cursor.next() )
         {
             // Handle closed session
@@ -647,7 +654,8 @@ public class SyncReplRequestHandler impl
             {
                 // The client has closed the connection
                 LOG.debug( "Request terminated for message {}, the client has closed the
session", req.getMessageId() );
-                PROVIDER_LOG.debug( "Request terminated for message {}, the client has closed
the session", req.getMessageId() );
+                PROVIDER_LOG.debug( "Request terminated for message {}, the client has closed
the session",
+                    req.getMessageId() );
                 break;
             }
 
@@ -665,7 +673,7 @@ public class SyncReplRequestHandler impl
 
             String lastSentCsn = entry.get( CSN_AT ).getString();
             replicaLog.setLastSentCsn( lastSentCsn );
-            
+
             count++;
         }
 
@@ -946,7 +954,7 @@ public class SyncReplRequestHandler impl
         {
             List<ReplicaEventLog> eventLogs = replicaUtil.getReplicaEventLogs();
             Set<String> eventLogNames = new HashSet<String>();
-            
+
             if ( !eventLogs.isEmpty() )
             {
                 for ( ReplicaEventLog replica : eventLogs )
@@ -968,11 +976,11 @@ public class SyncReplRequestHandler impl
                 LOG.debug( "no replica logs found to initialize" );
                 PROVIDER_LOG.debug( "no replica logs found to initialize" );
             }
-            
+
             // remove unused logs
-            for( File f : getAllReplJournalNames() )
+            for ( File f : getAllReplJournalNames() )
             {
-                if( !eventLogNames.contains( f.getName() ) )
+                if ( !eventLogNames.contains( f.getName() ) )
                 {
                     f.delete();
                     LOG.info( "removed unused replication event log {}", f );
@@ -1009,8 +1017,9 @@ public class SyncReplRequestHandler impl
             {
                 LOG.warn( "invalid persistent search criteria {} for the replica {}", log.getSearchCriteria(),
log
                     .getId() );
-                PROVIDER_LOG.warn( "invalid persistent search criteria {} for the replica
{}", log.getSearchCriteria(), log
-                    .getId() );
+                PROVIDER_LOG.warn( "invalid persistent search criteria {} for the replica
{}", log.getSearchCriteria(),
+                    log
+                        .getId() );
             }
         }
     }
@@ -1064,7 +1073,7 @@ public class SyncReplRequestHandler impl
 
 
     /**
-     * Create a new ReplicaEventLog
+     * Create a new ReplicaEventLog. Each replica will have a unique ID, created by the provider.
      */
     private ReplicaEventLog createReplicaEventLog( String hostName, String filter ) throws
Exception
     {
@@ -1085,7 +1094,7 @@ public class SyncReplRequestHandler impl
      */
     private void sendESyncRefreshRequired( LdapSession session, SearchRequest req ) throws
Exception
     {
-        SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse();
+        SearchResultDone searchDoneResp = req.getResultResponse();
         searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.E_SYNC_REFRESH_REQUIRED
);
         SyncDoneValue syncDone = new SyncDoneValueDecorator(
             ldapServer.getDirectoryService().getLdapCodecService() );
@@ -1104,8 +1113,8 @@ public class SyncReplRequestHandler impl
 
         return control.getMode() == SynchronizationModeEnum.REFRESH_AND_PERSIST;
     }
-    
-    
+
+
     private File[] getAllReplJournalNames()
     {
         File replDir = dirService.getInstanceLayout().getReplDirectory();
@@ -1117,11 +1126,10 @@ public class SyncReplRequestHandler impl
                 return name.startsWith( ReplicaEventLog.REPLICA_EVENT_LOG_NAME_PREFIX );
             }
         };
-        
+
         return replDir.listFiles( filter );
     }
-    
-    
+
     /**
      * an event listener for handling deletions of replication event log entries present
under ou=consumers,ou=system
      */
@@ -1135,31 +1143,33 @@ public class SyncReplRequestHandler impl
             // lock this listener instance
             synchronized ( this )
             {
-                for( ReplicaEventLog log : replicaLogMap.values() )
+                for ( ReplicaEventLog log : replicaLogMap.values() )
                 {
-                    if( name.equalsIgnoreCase( log.getName() ) )
+                    if ( name.equalsIgnoreCase( log.getName() ) )
                     {
                         synchronized ( log )
                         {
                             dirService.getEventService().removeListener( log.getPersistentListener()
);
                             LOG.debug( "removed the persistent listener for replication event
log {}", consumerLogDn );
-                            
+
                             replicaLogMap.remove( log.getId() );
                             try
                             {
                                 // get the correct name, just incase cause we used equalsIgnoreCase
                                 name = log.getName();
                                 log.stop();
-                                
+
                                 new File( dirService.getInstanceLayout().getReplDirectory(),
name ).delete();
                                 LOG.info( "successfully removed replication event log {}",
consumerLogDn );
                             }
-                            catch( Exception e )
+                            catch ( Exception e )
                             {
-                                LOG.warn( "Closing the replication event log of the entry
{} was not successful, will be removed anyway", consumerLogDn, e );
+                                LOG.warn(
+                                    "Closing the replication event log of the entry {} was
not successful, will be removed anyway",
+                                    consumerLogDn, e );
                             }
                         }
-                        
+
                         break;
                     }
                 } // end of for



Mime
View raw message