Author: elecharny Date: Sat Dec 1 06:58:46 2012 New Revision: 1415931 URL: http://svn.apache.org/viewvc?rev=1415931&view=rev Log: o Added a convenient method in config to return the provider as host:port o Added some logs for consumer o made a lot of methods private o Renamed the _deleteEntries_ metho processDelete Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplConfiguration.java directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplConfiguration.java URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplConfiguration.java?rev=1415931&r1=1415930&r2=1415931&view=diff ============================================================================== --- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplConfiguration.java (original) +++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/SyncReplConfiguration.java Sat Dec 1 06:58:46 2012 @@ -156,6 +156,16 @@ public class SyncReplConfiguration imple { this.remoteHost = remoteHost; } + + + /** + * A convenient method that concatenates the host and port of the producer + * @return The : the consumer is connected to + */ + public String getProducer() + { + return remoteHost + ":" + remotePort; + } /** Modified: directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java?rev=1415931&r1=1415930&r2=1415931&view=diff ============================================================================== --- directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java (original) +++ directory/apacheds/trunk/protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/consumer/ReplicationConsumerImpl.java Sat Dec 1 06:58:46 2012 @@ -228,18 +228,32 @@ public class ReplicationConsumerImpl imp connection.addConnectionClosedEventListener( this ); } - - // Do a bind - try + + // Try to connect + if ( connection.connect() ) { - connection.bind( config.getReplUserDn(), Strings.utf8ToString( config.getReplUserPassword() ) ); - disconnected = false; - return true; + CONSUMER_LOG.info( "Consumer {} connected to producer{}", config.getReplicaId(), config.getProducer() ); + + // Do a bind + try + { + connection.bind( config.getReplUserDn(), Strings.utf8ToString( config.getReplUserPassword() ) ); + disconnected = false; + + return true; + } + catch ( LdapException le ) + { + CONSUMER_LOG.warn( "Failed to bind to the producer {} with the given bind Dn {}", config.getProducer(), config.getReplUserDn() ); + LOG.warn( "Failed to bind to the server with the given bind Dn {}", config.getReplUserDn() ); + LOG.warn( "", le ); + } } - catch ( LdapException le ) + else { - LOG.warn( "Failed to bind to the server with the given bind Dn {}", config.getReplUserDn() ); - LOG.warn( "", le ); + CONSUMER_LOG.warn( "Consumer {} cannot connect to producer {}", config.getReplicaId(), config.getProducer() ); + + return false; } } catch ( Exception e ) @@ -253,11 +267,10 @@ public class ReplicationConsumerImpl imp /** - * * prepares a SearchRequest for syncing DIT content. * */ - public void prepareSyncSearchRequest() throws LdapException + private void prepareSyncSearchRequest() throws LdapException { String baseDn = config.getBaseDn(); @@ -279,10 +292,12 @@ public class ReplicationConsumerImpl imp searchRequest.addControl( new ManageDsaITDecorator( directoryService.getLdapCodecService(), new ManageDsaITImpl() ) ); } + + CONSUMER_LOG.debug( "Configuring consumer {}", config ); } - public ResultCodeEnum handleSearchDone( SearchResultDone searchDone ) + private ResultCodeEnum handleSearchDone( SearchResultDone searchDone ) { LOG.debug( "///////////////// handleSearchDone //////////////////" ); @@ -301,14 +316,14 @@ public class ReplicationConsumerImpl imp } - public void handleSearchReference( SearchResultReference searchRef ) + private void handleSearchReference( SearchResultReference searchRef ) { // this method won't be called cause the provider will serve the referrals as // normal entry objects due to the usage of ManageDsaITControl in the search request } - public void handleSearchResult( SearchResultEntry syncResult ) + private void handleSearchResult( SearchResultEntry syncResult ) { LOG.debug( "------------- starting handleSearchResult ------------" ); @@ -431,7 +446,7 @@ public class ReplicationConsumerImpl imp /** * {@inheritDoc} */ - public void handleSyncInfo( IntermediateResponse syncInfoResp ) + private void handleSyncInfo( IntermediateResponse syncInfoResp ) { try { @@ -494,8 +509,7 @@ public class ReplicationConsumerImpl imp */ public void connectionClosed() { - String producer = config.getRemoteHost() + ":" + config.getRemotePort(); - CONSUMER_LOG.debug( "Consumer {} session with {} has been closed ", config.getReplicaId(), producer ); + CONSUMER_LOG.debug( "Consumer {} session with {} has been closed ", config.getReplicaId(), config.getProducer() ); if ( disconnected ) { @@ -507,11 +521,11 @@ public class ReplicationConsumerImpl imp /** - * starts the synchronization operation + * Starts the synchronization operation */ - public void startSync() + private void startSync() { - CONSUMER_LOG.debug( "Starting the SyncRepl process" ); + CONSUMER_LOG.debug( "Starting the SyncRepl process for consumer {}", config.getReplicaId() ); // read the cookie if persisted readCookie(); @@ -552,8 +566,6 @@ public class ReplicationConsumerImpl imp */ public void start( boolean now ) { - String producer = config.getRemoteHost() + ":" + config.getRemotePort(); - boolean connected = false; if ( now ) @@ -566,7 +578,7 @@ public class ReplicationConsumerImpl imp try { CONSUMER_LOG.debug( "Consumer {} cannot connect to {}, wait 5 seconds.", config.getReplicaId(), - producer ); + config.getProducer() ); // try to establish a connection for every 5 seconds Thread.sleep( 5000 ); @@ -574,14 +586,12 @@ public class ReplicationConsumerImpl imp catch ( InterruptedException e ) { LOG.warn( "Consumer {} Interrupted while trying to reconnect to the provider {}", - config.getReplicaId(), producer ); + config.getReplicaId(), config.getProducer() ); } connected = connect(); } - CONSUMER_LOG.debug( "Consumer {} connected to {}", config.getReplicaId(), producer ); - startSync(); } @@ -705,7 +715,10 @@ public class ReplicationConsumerImpl imp } - public void disconnect() + /** + * Disconnect from the producer + */ + private void disconnect() { disconnected = true; @@ -717,10 +730,12 @@ public class ReplicationConsumerImpl imp } connection.unBind(); - LOG.info( "Unbound from the server {}", config.getRemoteHost() ); + LOG.info( "Unbound from the server {}", config.getProducer() ); + CONSUMER_LOG.info( "Unbound from the server {}", config.getProducer() ); connection.close(); - LOG.info( "Connection closed for the server {}", config.getRemoteHost() ); + LOG.info( "Connection closed for the server {}", config.getProducer() ); + CONSUMER_LOG.info( "Connection closed for the server {}", config.getProducer() ); connection = null; } @@ -788,7 +803,7 @@ public class ReplicationConsumerImpl imp /** - * read the cookie + * Read the cookie for a consumer */ private void readCookie() { @@ -796,8 +811,6 @@ public class ReplicationConsumerImpl imp { Entry entry = session.lookup( config.getConfigEntryDn(), SchemaConstants.ADS_REPL_COOKIE ); - CONSUMER_LOG.debug( "The cookie is stored in the DIT : {}", entry ); - if ( entry != null ) { Attribute attr = entry.get( REPL_COOKIE_AT ); @@ -806,15 +819,25 @@ public class ReplicationConsumerImpl imp { syncCookie = attr.getBytes(); lastSavedCookie = syncCookie; - CONSUMER_LOG.debug( "Read cookie : '{}'", attr ); + String syncCookieString = Strings.utf8ToString( syncCookie ); + CONSUMER_LOG.debug( "Cookie for consumer {} : {}", config.getReplicaId(), syncCookieString ); LOG.debug( "loaded cookie from DIT" ); } + else + { + CONSUMER_LOG.debug( "No cookie found for consumer {}", config.getReplicaId() ); + } + } + else + { + CONSUMER_LOG.debug( "Cannot find the configuration '{}' in the DIT for consumer {}", config.getConfigEntryDn(), config.getReplicaId() ); } } catch ( Exception e ) { // can be ignored, most likely happens if there is no entry with the given Dn // log in debug mode + CONSUMER_LOG.debug( "Cannot find the '{}' in the DIT for consumer {}", config.getConfigEntryDn(), config.getReplicaId() ); LOG.debug( "Failed to read the cookie from the entry {}", config.getConfigEntryDn(), e ); } } @@ -823,7 +846,7 @@ public class ReplicationConsumerImpl imp /** * deletes the cookie and resets the syncCookie to null */ - public void removeCookie() + private void removeCookie() { try { @@ -1053,7 +1076,7 @@ public class ReplicationConsumerImpl imp * @param replicaId TODO * @throws Exception in case of any problems while deleting the entries */ - public void deleteEntries( List uuidList, boolean isRefreshPresent, int replicaId ) throws Exception + private void deleteEntries( List uuidList, boolean isRefreshPresent, int replicaId ) throws Exception { if ( uuidList == null || uuidList.isEmpty() ) { @@ -1071,7 +1094,7 @@ public class ReplicationConsumerImpl imp if ( isRefreshPresent ) { LOG.debug( "refresh present syncinfo list has {} UUIDs", uuidList.size() ); - _deleteEntries_( uuidList, isRefreshPresent, replicaId ); + processDelete( uuidList, isRefreshPresent, replicaId ); return; } @@ -1084,7 +1107,7 @@ public class ReplicationConsumerImpl imp for ( ; i < count; i++ ) { startIndex = i * NODE_LIMIT; - _deleteEntries_( uuidList.subList( startIndex, startIndex + NODE_LIMIT ), isRefreshPresent, replicaId ); + processDelete( uuidList.subList( startIndex, startIndex + NODE_LIMIT ), isRefreshPresent, replicaId ); } if ( ( uuidList.size() % NODE_LIMIT ) != 0 ) @@ -1094,7 +1117,8 @@ public class ReplicationConsumerImpl imp { startIndex = i * NODE_LIMIT; } - _deleteEntries_( uuidList.subList( startIndex, uuidList.size() ), isRefreshPresent, replicaId ); + + processDelete( uuidList.subList( startIndex, uuidList.size() ), isRefreshPresent, replicaId ); } } @@ -1106,7 +1130,7 @@ public class ReplicationConsumerImpl imp * @param isRefreshPresent a flag indicating the type of entries present in the UUID list * @param replicaId TODO */ - private void _deleteEntries_( List limitedUuidList, boolean isRefreshPresent, int replicaId ) throws Exception + private void processDelete( List limitedUuidList, boolean isRefreshPresent, int replicaId ) throws Exception { ExprNode filter = null; int size = limitedUuidList.size(); @@ -1155,8 +1179,7 @@ public class ReplicationConsumerImpl imp EntryFilteringCursor cursor = session.search( dn, SearchScope.SUBTREE, filter, AliasDerefMode.NEVER_DEREF_ALIASES, ENTRY_UUID_ATOP_SET ); cursor.beforeFirst(); - - List entryDnLst = new ArrayList(); + while ( cursor.next() ) { Entry entry = cursor.get(); @@ -1311,6 +1334,9 @@ public class ReplicationConsumerImpl imp } + /** + * @see Object#toString() + */ public String toString() { StringBuilder sb = new StringBuilder();