Return-Path: X-Original-To: apmail-directory-commits-archive@www.apache.org Delivered-To: apmail-directory-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 254A7DA20 for ; Wed, 31 Oct 2012 14:27:06 +0000 (UTC) Received: (qmail 2077 invoked by uid 500); 31 Oct 2012 14:27:05 -0000 Delivered-To: apmail-directory-commits-archive@directory.apache.org Received: (qmail 2031 invoked by uid 500); 31 Oct 2012 14:27:05 -0000 Mailing-List: contact commits-help@directory.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@directory.apache.org Delivered-To: mailing list commits@directory.apache.org Received: (qmail 2020 invoked by uid 99); 31 Oct 2012 14:27:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Oct 2012 14:27:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Oct 2012 14:27:01 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1CCF523888EA for ; Wed, 31 Oct 2012 14:26:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1404167 - in /directory/apacheds/trunk: core-shared/src/main/java/org/apache/directory/server/core/shared/partition/ server-integ/src/test/java/org/apache/directory/server/replication/ Date: Wed, 31 Oct 2012 14:26:15 -0000 To: commits@directory.apache.org From: kayyagari@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121031142616.1CCF523888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kayyagari Date: Wed Oct 31 14:26:15 2012 New Revision: 1404167 URL: http://svn.apache.org/viewvc?rev=1404167&view=rev Log: o set new or updated CSN value in PartitionNexus after an operation completed for replication o improved cookie handling in mock consumer o added various fixes to ClientInitialRefreshIT and also reduced the number of entries added (the behavior would be same for higher values) Modified: directory/apacheds/trunk/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultPartitionNexus.java directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java Modified: directory/apacheds/trunk/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultPartitionNexus.java URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultPartitionNexus.java?rev=1404167&r1=1404166&r2=1404167&view=diff ============================================================================== --- directory/apacheds/trunk/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultPartitionNexus.java (original) +++ directory/apacheds/trunk/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultPartitionNexus.java Wed Oct 31 14:26:15 2012 @@ -37,6 +37,7 @@ import javax.naming.ConfigurationExcepti import org.apache.directory.server.constants.ServerDNConstants; import org.apache.directory.server.core.api.CoreSession; import org.apache.directory.server.core.api.DirectoryService; +import org.apache.directory.server.core.api.InterceptorEnum; import org.apache.directory.server.core.api.entry.ClonedServerEntry; import org.apache.directory.server.core.api.filtering.BaseEntryFilteringCursor; import org.apache.directory.server.core.api.filtering.CursorList; @@ -504,6 +505,15 @@ public class DefaultPartitionNexus exten { Partition partition = getPartition( deleteContext.getDn() ); partition.delete( deleteContext ); + + Entry entry = deleteContext.getEntry(); + Attribute csn = entry.get( ENTRY_CSN_AT ); + // can be null while doing subentry deletion + //TODO verify if this gets in the way of replication + if ( csn != null ) + { + directoryService.setContextCsn( csn.getString() ); + } } @@ -590,7 +600,12 @@ public class DefaultPartitionNexus exten Partition partition = getPartition( modifyContext.getDn() ); partition.modify( modifyContext ); - + + if( modifyContext.isPushToEvtIntrcptor() ) + { + directoryService.getInterceptor( InterceptorEnum.EVENT_INTERCEPTOR.getName() ).modify( modifyContext ); + } + Entry alteredEntry = modifyContext.getAlteredEntry(); if ( alteredEntry != null ) @@ -608,10 +623,10 @@ public class DefaultPartitionNexus exten // Get the current partition Partition partition = getPartition( moveContext.getDn() ); - // We also have to get the new partition as it can be different - //Partition newBackend = getPartition( opContext.getNewDn() ); - partition.move( moveContext ); + + Entry entry = moveContext.getModifiedEntry(); + directoryService.setContextCsn( entry.get( ENTRY_CSN_AT ).getString() ); } @@ -622,6 +637,9 @@ public class DefaultPartitionNexus exten { Partition partition = getPartition( moveAndRenameContext.getDn() ); partition.moveAndRename( moveAndRenameContext ); + + Entry entry = moveAndRenameContext.getModifiedEntry(); + directoryService.setContextCsn( entry.get( ENTRY_CSN_AT ).getString() ); } @@ -632,6 +650,9 @@ public class DefaultPartitionNexus exten { Partition partition = getPartition( renameContext.getDn() ); partition.rename( renameContext ); + + Entry entry = renameContext.getModifiedEntry(); + directoryService.setContextCsn( entry.get( ENTRY_CSN_AT ).getString() ); } @@ -869,6 +890,7 @@ public class DefaultPartitionNexus exten if ( !partition.isInitialized() ) { + partition.setCacheService( directoryService.getCacheService() ); partition.initialize(); } Modified: directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java?rev=1404167&r1=1404166&r2=1404167&view=diff ============================================================================== --- directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java (original) +++ directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/ClientInitialRefreshIT.java Wed Oct 31 14:26:15 2012 @@ -23,8 +23,13 @@ package org.apache.directory.server.repl import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.FileUtils; import org.apache.directory.junit.tools.MultiThreadedMultiInvoker; import org.apache.directory.server.annotations.CreateLdapServer; import org.apache.directory.server.annotations.CreateTransport; @@ -48,7 +53,9 @@ import org.apache.directory.shared.ldap. import org.apache.directory.shared.ldap.model.message.SearchRequestImpl; import org.apache.directory.shared.ldap.model.name.Dn; import org.apache.directory.shared.ldap.model.schema.SchemaManager; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -72,7 +79,12 @@ public class ClientInitialRefreshIT private static AtomicInteger entryCount = new AtomicInteger(); + private static final int INSERT_COUNT = 10; + + private static final int TOTAL_COUNT = INSERT_COUNT + 1; + private static File cookiesDir; + @BeforeClass public static void setUp() throws Exception { @@ -81,20 +93,36 @@ public class ClientInitialRefreshIT startProvider(); // Load 1000 entries - for ( int i = 0; i < 1000; i++ ) + for ( int i = 0; i < INSERT_COUNT; i++ ) { Entry entry = createEntry(); providerSession.add( entry ); } + + cookiesDir = new File( FileUtils.getTempDirectory(), MockSyncReplConsumer.COOKIES_DIR_NAME ); } + + @Before + @After + public void deleteCookies() throws IOException + { + if( cookiesDir.exists() ) + { + FileUtils.cleanDirectory( cookiesDir ); + } + } + @AfterClass public static void tearDown() throws Exception { providerServer.stop(); providerServer.getDirectoryService().shutdown(); + + FileUtils.deleteDirectory( providerServer.getDirectoryService().getInstanceLayout().getInstanceDirectory() ); + FileUtils.deleteDirectory( cookiesDir ); } @@ -329,8 +357,8 @@ public class ClientInitialRefreshIT ReplicationConsumer consumer = createConsumer(); - // We should have 1000 entries plus the base entry = 1001 - assertTrue( waitForSyncReplClient( consumer, 1001 ) ); + // We should have 1000 entries plus the base entry = TOTAL_COUNT + assertTrue( waitForSyncReplClient( consumer, TOTAL_COUNT ) ); consumer.stop(); System.out.println( "\n<-- Done" ); @@ -348,16 +376,16 @@ public class ClientInitialRefreshIT ReplicationConsumer consumer = createConsumer(); - // We should have 1000 entries plus the base entry = 1001 - assertTrue( waitForSyncReplClient( consumer, 1001 ) ); + // We should have INSERT_COUNT entries plus the base entry = TOTAL_COUNT + assertTrue( waitForSyncReplClient( consumer, TOTAL_COUNT ) ); + + // Reset the added counter + ( ( MockSyncReplConsumer ) consumer ).resetNbAdded(); // Inject a new entry in the producer Entry addedEntry = createEntry(); providerSession.add( addedEntry ); - // Reset the added counter - ( ( MockSyncReplConsumer ) consumer ).resetNbAdded(); - // Now check that the entry has been copied in the consumer assertTrue( waitForSyncReplClient( consumer, 1 ) ); @@ -370,8 +398,8 @@ public class ClientInitialRefreshIT /** - * Test that we can load entries, kill the consumer in the middle of the load, - * restart the consumer and still get all the entries. + * Test that we can load entries, kill the consumer after the load (cause we cannot tell the producer when to stop), + * add some more entries and restart the consumer and get the remaining entries added when the consumer was down. */ @Test public void testInitialRefreshStopAndGo() throws Exception @@ -380,19 +408,34 @@ public class ClientInitialRefreshIT ReplicationConsumer consumer = createConsumer(); - // Load but stop after 200 entries have been loaded - waitUntilLimitSyncReplClient( 200, consumer ); + // Load all the entries + waitUntilLimitSyncReplClient( TOTAL_COUNT, consumer ); // Stop the consumer consumer.stop(); + int additionalCount = 10; + List newEntries = new ArrayList(); + for( int i = 0; i < additionalCount; i++ ) + { + // Inject a new entry in the producer + Entry addedEntry = createEntry(); + providerSession.add( addedEntry ); + newEntries.add( addedEntry.getDn() ); + } + // Start it again runConsumer( consumer ); - // We should have 1000 entries plus the base entry = 1001 - assertTrue( waitForSyncReplClient( consumer, 1001 ) ); + // We should get only the additional values cause consumer sends a cookie now + assertTrue( waitForSyncReplClient( consumer, additionalCount ) ); consumer.stop(); + for( Dn dn : newEntries ) + { + providerSession.delete( dn ); + } + System.out.println( "\n<-- Done" ); } @@ -410,8 +453,7 @@ public class ClientInitialRefreshIT ReplicationConsumer consumer3 = createConsumer(); ReplicationConsumer consumer4 = createConsumer(); - // Load but stop after 200 entries have been loaded - assertTrue( waitUntilLimitSyncReplClient( 1001, consumer1, consumer2, consumer3, consumer4 ) ); + assertTrue( waitUntilLimitSyncReplClient( TOTAL_COUNT, consumer1, consumer2, consumer3, consumer4 ) ); consumer1.stop(); consumer2.stop(); Modified: directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java URL: http://svn.apache.org/viewvc/directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java?rev=1404167&r1=1404166&r2=1404167&view=diff ============================================================================== --- directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java (original) +++ directory/apacheds/trunk/server-integ/src/test/java/org/apache/directory/server/replication/MockSyncReplConsumer.java Wed Oct 31 14:26:15 2012 @@ -37,6 +37,7 @@ import org.apache.directory.ldap.client. import org.apache.directory.ldap.client.api.future.SearchFuture; import org.apache.directory.server.core.api.DirectoryService; import org.apache.directory.server.core.api.filtering.EntryFilteringCursor; +import org.apache.directory.server.ldap.LdapProtocolUtils; import org.apache.directory.server.ldap.replication.ReplicationConsumerConfig; import org.apache.directory.server.ldap.replication.SyncreplConfiguration; import org.apache.directory.server.ldap.replication.consumer.ReplicationConsumer; @@ -123,6 +124,10 @@ public class MockSyncReplConsumer implem /** The number of added entries */ private AtomicInteger nbAdded = new AtomicInteger( 0 ); + private File cookieDir; + + public static String COOKIES_DIR_NAME = "cookies"; + /** attributes on which modification should be ignored */ private static final String[] MOD_IGNORE_AT = new String[] { @@ -181,9 +186,8 @@ public class MockSyncReplConsumer implem cookieModLst = new ArrayList( 1 ); cookieModLst.add( cookieMod ); - File cookieDir = File.createTempFile( "cookies", "" ); - cookieDir.deleteOnExit(); - cookieFile = new File( cookieDir, String.valueOf( config.getReplicaId() ) ); + cookieDir = new File( System.getProperty( "java.io.tmpdir" ) + "/" + COOKIES_DIR_NAME ); + cookieDir.mkdirs(); prepareSyncSearchRequest(); } @@ -371,22 +375,33 @@ public class MockSyncReplConsumer implem { LOG.debug( "............... inside handleSyncInfo ..............." ); - SyncInfoValue decorator = new SyncInfoValueDecorator( ldapCodecService ); - byte[] syncinfo = syncInfoResp.getResponseValue(); - ( ( SyncInfoValueDecorator ) decorator ).setValue( syncinfo ); - SyncInfoValue syncInfoValue = ( ( SyncInfoValueDecorator ) decorator ).getDecorated(); + byte[] syncInfoBytes = syncInfoResp.getResponseValue(); + + if ( syncInfoBytes == null ) + { + return; + } + + SyncInfoValueDecorator decorator = new SyncInfoValueDecorator( ldapCodecService ); + SyncInfoValue syncInfoValue = ( SyncInfoValue ) decorator.decode( syncInfoBytes ); byte[] cookie = syncInfoValue.getCookie(); + int replicaId = -1; + if ( cookie != null ) { LOG.debug( "setting the cookie from the sync info: " + Strings.utf8ToString( cookie ) ); syncCookie = cookie; + + String cookieString = Strings.utf8ToString( syncCookie ); + replicaId = LdapProtocolUtils.getReplicaId( cookieString ); } LOG.info( "refreshDeletes: " + syncInfoValue.isRefreshDeletes() ); List uuidList = syncInfoValue.getSyncUUIDs(); + // if refreshDeletes set to true then delete all the entries with entryUUID // present in the syncIdSet if ( syncInfoValue.isRefreshDeletes() ) @@ -404,8 +419,7 @@ public class MockSyncReplConsumer implem } catch ( Exception de ) { - LOG.error( "Failed to handle syncinfo message" ); - de.printStackTrace(); + LOG.error( "Failed to handle syncinfo message", de ); } LOG.debug( ".................... END handleSyncInfo ..............." ); @@ -606,18 +620,19 @@ public class MockSyncReplConsumer implem LOG.info( "Connection closed for the server {}", config.getRemoteHost() ); connection = null; - - // persist the cookie - storeCookie(); - - // reset the cookie - syncCookie = null; } catch ( Exception e ) { LOG.error( "Failed to close the connection", e ); } - + finally + { + // persist the cookie + storeCookie(); + + // reset the cookie + syncCookie = null; + } } @@ -638,6 +653,11 @@ public class MockSyncReplConsumer implem try { + if( cookieFile == null ) + { + cookieFile = new File( cookieDir, String.valueOf( LdapProtocolUtils.getReplicaId( new String( syncCookie ) ) ) ); + } + FileOutputStream fout = new FileOutputStream( cookieFile ); fout.write( syncCookie.length ); fout.write( syncCookie ); @@ -662,7 +682,7 @@ public class MockSyncReplConsumer implem { try { - if ( cookieFile.exists() && ( cookieFile.length() > 0 ) ) + if ( ( cookieFile != null ) && cookieFile.exists() && ( cookieFile.length() > 0 ) ) { FileInputStream fin = new FileInputStream( cookieFile ); syncCookie = new byte[fin.read()];