Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7C796200D51 for ; Fri, 8 Dec 2017 00:09:29 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7ACFA160C1E; Thu, 7 Dec 2017 23:09:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 006E1160C0C for ; Fri, 8 Dec 2017 00:09:26 +0100 (CET) Received: (qmail 25775 invoked by uid 500); 7 Dec 2017 23:09:26 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 25760 invoked by uid 99); 7 Dec 2017 23:09:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Dec 2017 23:09:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8A64CDF9AB; Thu, 7 Dec 2017 23:09:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mhubail@apache.org To: commits@asterixdb.apache.org Date: Thu, 07 Dec 2017 23:09:26 -0000 Message-Id: <613a383d3158443a9cd66a1006957a27@git.apache.org> In-Reply-To: <280ebc46e2a14cfca48e347558ef49b3@git.apache.org> References: <280ebc46e2a14cfca48e347558ef49b3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] asterixdb git commit: [NO ISSUE][STO] Introduce Index Checkpoints archived-at: Thu, 07 Dec 2017 23:09:29 -0000 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java index 0f2ea50..1c57d1b 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java @@ -21,6 +21,8 @@ package org.apache.asterix.test.ioopcallbacks; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; +import org.apache.asterix.common.storage.IIndexCheckpointManager; +import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; @@ -45,8 +47,8 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class)); - LSMBTreeIOOperationCallback callback = - new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); + LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(), + mockIndexCheckpointManagerProvider()); //request to flush first component callback.updateLastLSN(1); @@ -57,13 +59,14 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { callback.beforeOperation(LSMIOOperationType.FLUSH); Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent()); - callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + final ILSMDiskComponent diskComponent1 = mockDiskComponent(); + callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent1); + callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent1); Assert.assertEquals(2, callback.getComponentLSN(null)); - - callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent()); - callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + final ILSMDiskComponent diskComponent2 = mockDiskComponent(); + callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2); + callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2); } @Test @@ -72,8 +75,8 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class)); - LSMBTreeIOOperationCallback callback = - new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); + LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(), + mockIndexCheckpointManagerProvider()); //request to flush first component callback.updateLastLSN(1); @@ -90,11 +93,13 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { //the scheduleFlush request would fail this time Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent()); - callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); - + final ILSMDiskComponent diskComponent1 = mockDiskComponent(); + callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent1); + callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent1); + final ILSMDiskComponent diskComponent2 = mockDiskComponent(); Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2); + callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2); } @Test @@ -103,9 +108,8 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class)); - LSMBTreeIOOperationCallback callback = - new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); - + LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(), + mockIndexCheckpointManagerProvider()); //request to flush first component callback.updateLastLSN(1); callback.beforeOperation(LSMIOOperationType.FLUSH); @@ -144,7 +148,8 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); - LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator); + LSMBTreeIOOperationCallback callback = + new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider()); ILSMComponentId initialId = idGenerator.getId(); // simulate a partition is flushed before allocated @@ -162,7 +167,8 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); - LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator); + LSMBTreeIOOperationCallback callback = + new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider()); ILSMComponentId id = idGenerator.getId(); callback.allocated(mockComponent); @@ -178,8 +184,9 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { callback.beforeOperation(LSMIOOperationType.FLUSH); callback.recycled(mockComponent, true); - callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent()); - callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + final ILSMDiskComponent diskComponent = mockDiskComponent(); + callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent); + callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent); checkMemoryComponent(expectedId, mockComponent); } } @@ -191,7 +198,8 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); - LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator); + LSMBTreeIOOperationCallback callback = + new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider()); ILSMComponentId id = idGenerator.getId(); callback.allocated(mockComponent); @@ -216,7 +224,8 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator); + LSMBTreeIOOperationCallback callback = + new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider()); ILSMComponentId id = idGenerator.getId(); callback.allocated(mockComponent); @@ -230,7 +239,9 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { callback.updateLastLSN(0); callback.beforeOperation(LSMIOOperationType.FLUSH); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + final ILSMDiskComponent diskComponent = mockDiskComponent(); + callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent); + callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent); // another flush is to be scheduled before the component is recycled idGenerator.refresh(); @@ -243,7 +254,9 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { // schedule the next flush callback.updateLastLSN(0); callback.beforeOperation(LSMIOOperationType.FLUSH); - callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + final ILSMDiskComponent diskComponent2 = mockDiskComponent(); + callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2); + callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2); callback.recycled(mockComponent, true); checkMemoryComponent(nextId, mockComponent); } @@ -263,5 +276,14 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { return component; } - protected abstract AbstractLSMIOOperationCallback getIoCallback(); + protected IIndexCheckpointManagerProvider mockIndexCheckpointManagerProvider() throws HyracksDataException { + IIndexCheckpointManagerProvider indexCheckpointManagerProvider = + Mockito.mock(IIndexCheckpointManagerProvider.class); + IIndexCheckpointManager indexCheckpointManager = Mockito.mock(IIndexCheckpointManager.class); + Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong()); + Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any()); + return indexCheckpointManagerProvider; + } + + protected abstract AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java index c22e2e3..a4bc399 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java @@ -21,6 +21,7 @@ package org.apache.asterix.test.ioopcallbacks; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; import org.mockito.Mockito; @@ -28,10 +29,11 @@ import org.mockito.Mockito; public class LSMBTreeIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest { @Override - protected AbstractLSMIOOperationCallback getIoCallback() { + protected AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - return new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); + return new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(), + mockIndexCheckpointManagerProvider()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java index 356c80a..5f37c78 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java @@ -21,6 +21,7 @@ package org.apache.asterix.test.ioopcallbacks; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallback; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; import org.mockito.Mockito; @@ -28,10 +29,11 @@ import org.mockito.Mockito; public class LSMBTreeWithBuddyIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest { @Override - protected AbstractLSMIOOperationCallback getIoCallback() { + protected AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - return new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); + return new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator(), + mockIndexCheckpointManagerProvider()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java index ac4595e..343bc59 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java @@ -21,6 +21,7 @@ package org.apache.asterix.test.ioopcallbacks; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallback; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; import org.mockito.Mockito; @@ -28,10 +29,11 @@ import org.mockito.Mockito; public class LSMInvertedIndexIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest { @Override - protected AbstractLSMIOOperationCallback getIoCallback() { + protected AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - return new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); + return new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator(), + mockIndexCheckpointManagerProvider()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java index 0131e3f..10d95d8 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java @@ -21,6 +21,7 @@ package org.apache.asterix.test.ioopcallbacks; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallback; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; import org.mockito.Mockito; @@ -28,10 +29,11 @@ import org.mockito.Mockito; public class LSMRTreeIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest { @Override - protected AbstractLSMIOOperationCallback getIoCallback() { + protected AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); - return new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator()); + return new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(), + mockIndexCheckpointManagerProvider()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java index 814e109..1434629 100644 --- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java +++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java @@ -19,43 +19,18 @@ package org.apache.asterix.installer.test; import java.io.File; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.logging.Logger; -import org.apache.asterix.event.error.VerificationUtil; -import org.apache.asterix.event.model.AsterixInstance; import org.apache.asterix.event.model.AsterixInstance.State; -import org.apache.asterix.event.model.AsterixRuntimeState; -import org.apache.asterix.event.service.ServiceProvider; -import org.apache.asterix.installer.command.CommandHandler; -import org.apache.asterix.test.common.TestExecutor; -import org.apache.asterix.testframework.context.TestCaseContext; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.FixMethodOrder; -import org.junit.Test; -import org.junit.runners.MethodSorters; -import org.junit.runners.Parameterized.Parameters; - -@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class AsterixLifecycleIT { - private static final int NUM_NC = 2; - private static final CommandHandler cmdHandler = new CommandHandler(); - private static final String PATH_BASE = "src/test/resources/integrationts/lifecycle"; private static final String PATH_ACTUAL = "target" + File.separator + "ittest" + File.separator; - private static final Logger LOGGER = Logger.getLogger(AsterixLifecycleIT.class.getName()); - private static List testCaseCollection; - private final TestExecutor testExecutor = new TestExecutor(); @BeforeClass public static void setUp() throws Exception { AsterixInstallerIntegrationUtil.init(AsterixInstallerIntegrationUtil.LOCAL_CLUSTER_PATH); - TestCaseContext.Builder b = new TestCaseContext.Builder(); - testCaseCollection = b.build(new File(PATH_BASE)); File outdir = new File(PATH_ACTUAL); outdir.mkdirs(); } @@ -70,88 +45,8 @@ public class AsterixLifecycleIT { } } - @Parameters - public static Collection tests() throws Exception { - Collection testArgs = new ArrayList<>(); - return testArgs; - } - public static void restartInstance() throws Exception { AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.INACTIVE); AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE); } - - @Test - public void test_1_StopActiveInstance() throws Exception { - try { - LOGGER.info("Starting test: test_1_StopActiveInstance"); - AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE); - String command = "stop -n " + AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME; - cmdHandler.processCommand(command.split(" ")); - Thread.sleep(4000); - AsterixInstance instance = ServiceProvider.INSTANCE.getLookupService() - .getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME); - AsterixRuntimeState state = VerificationUtil.getAsterixRuntimeState(instance); - assert (state.getFailedNCs().size() == NUM_NC && !state.isCcRunning()); - LOGGER.info("PASSED: test_1_StopActiveInstance"); - } catch (Exception e) { - throw new Exception("Test configure installer " + "\" FAILED!", e); - } - } - - @Test - public void test_2_StartActiveInstance() throws Exception { - try { - LOGGER.info("Starting test: test_2_StartActiveInstance"); - AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.INACTIVE); - String command = "start -n " + AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME; - cmdHandler.processCommand(command.split(" ")); - AsterixInstance instance = ServiceProvider.INSTANCE.getLookupService() - .getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME); - AsterixRuntimeState state = VerificationUtil.getAsterixRuntimeState(instance); - assert (state.getFailedNCs().size() == 0 && state.isCcRunning()); - LOGGER.info("PASSED: test_2_StartActiveInstance"); - } catch (Exception e) { - throw new Exception("Test configure installer " + "\" FAILED!", e); - } - } - - @Test - public void test_3_DeleteActiveInstance() throws Exception { - try { - LOGGER.info("Starting test: test_3_DeleteActiveInstance"); - AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.INACTIVE); - String command = "delete -n " + AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME; - cmdHandler.processCommand(command.split(" ")); - AsterixInstance instance = ServiceProvider.INSTANCE.getLookupService() - .getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME); - assert (instance == null); - LOGGER.info("PASSED: test_3_DeleteActiveInstance"); - } catch (Exception e) { - throw new Exception("Test delete active instance " + "\" FAILED!", e); - } finally { - // recreate instance - AsterixInstallerIntegrationUtil.createInstance(); - } - } - - @Test - public void test() throws Exception { - for (TestCaseContext testCaseCtx : testCaseCollection) { - testExecutor.executeTest(PATH_ACTUAL, testCaseCtx, null, false); - } - } - - public static void main(String[] args) throws Exception { - try { - setUp(); - new AsterixLifecycleIT().test(); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.info("TEST CASE(S) FAILED"); - } finally { - tearDown(); - } - } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java index 3b2aff7..0444952 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java @@ -27,10 +27,6 @@ public class RemoteLogMapping { private long localLSN; public AtomicInteger numOfFlushedIndexes = new AtomicInteger(); - public String getRemoteNodeID() { - return remoteNodeID; - } - public void setRemoteNodeID(String remoteNodeID) { this.remoteNodeID = remoteNodeID; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index 3143284..dfc23d3 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -27,9 +27,6 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -39,13 +36,17 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Predicate; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.context.IndexInfo; +import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.replication.IReplicaResourcesManager; @@ -55,6 +56,9 @@ import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.replication.IReplicationThread; import org.apache.asterix.common.replication.ReplicaEvent; import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.common.storage.IIndexCheckpointManager; +import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.LogRecord; @@ -72,9 +76,13 @@ import org.apache.asterix.replication.storage.LSMComponentProperties; import org.apache.asterix.replication.storage.LSMIndexFileProperties; import org.apache.asterix.replication.storage.ReplicaResourcesManager; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.asterix.transaction.management.service.logging.LogBuffer; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; +import org.apache.hyracks.storage.common.LocalResource; import org.apache.hyracks.util.StorageUtil; import org.apache.hyracks.util.StorageUtil.StorageUnit; @@ -89,7 +97,6 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { private final String localNodeID; private final ILogManager logManager; private final ReplicaResourcesManager replicaResourcesManager; - private SocketChannel socketChannel = null; private ServerSocketChannel serverSocketChannel = null; private final IReplicationManager replicationManager; private final ReplicationProperties replicationProperties; @@ -98,6 +105,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { private final LinkedBlockingQueue lsmComponentRemoteLSN2LocalLSNMappingTaskQ; private final LinkedBlockingQueue pendingNotificationRemoteLogsQ; private final Map lsmComponentId2PropertiesMap; + private final Map localLsn2RemoteMapping; private final Map replicaUniqueLSN2RemoteMapping; private final LSMComponentsSyncService lsmComponentLSNMappingService; private final Set nodeHostedPartitions; @@ -105,6 +113,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { private final Object flushLogslock = new Object(); private final IDatasetLifecycleManager dsLifecycleManager; private final PersistentLocalResourceRepository localResourceRep; + private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; public ReplicationChannel(String nodeId, ReplicationProperties replicationProperties, ILogManager logManager, IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager, @@ -122,6 +131,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>(); lsmComponentId2PropertiesMap = new ConcurrentHashMap<>(); replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>(); + localLsn2RemoteMapping = new ConcurrentHashMap<>(); lsmComponentLSNMappingService = new LSMComponentsSyncService(); replicationNotifier = new ReplicationNotifier(); replicationThreads = Executors.newCachedThreadPool(ncServiceContext.getThreadFactory()); @@ -136,6 +146,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { } nodeHostedPartitions = new HashSet<>(clientsPartitions.size()); nodeHostedPartitions.addAll(clientsPartitions); + this.indexCheckpointManagerProvider = + ((INcApplicationContext) ncServiceContext.getApplicationContext()).getIndexCheckpointManagerProvider(); } @Override @@ -156,7 +168,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { //start accepting replication requests while (true) { - socketChannel = serverSocketChannel.accept(); + SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(true); //start a new thread to handle the request replicationThreads.execute(new ReplicationThread(socketChannel)); @@ -349,16 +361,19 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { } if (afp.isLSMComponentFile()) { String componentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath()); - if (afp.getLSNByteOffset() > AbstractLSMIOOperationCallback.INVALID) { - LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(componentId, - destFile.getAbsolutePath(), afp.getLSNByteOffset()); + final LSMComponentProperties lsmComponentProperties = lsmComponentId2PropertiesMap.get(componentId); + // merge operations do not generate flush logs + if (afp.requiresAck() && lsmComponentProperties.getOpType() == LSMOperationType.FLUSH) { + LSMComponentLSNSyncTask syncTask = + new LSMComponentLSNSyncTask(componentId, destFile.getAbsolutePath()); lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask); } else { updateLSMComponentRemainingFiles(componentId); } } else { //index metadata file - replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN()); + final ResourceReference indexRef = ResourceReference.of(destFile.getAbsolutePath()); + indexCheckpointManagerProvider.get(indexRef).init(logManager.getAppendLSN()); } } } @@ -402,8 +417,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r"); FileChannel fileChannel = fromFile.getChannel();) { long fileSize = fileChannel.size(); - fileProperties.initialize(filePath, fileSize, partition.getNodeId(), false, - AbstractLSMIOOperationCallback.INVALID, false); + fileProperties.initialize(filePath, fileSize, partition.getNodeId(), false, false); outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties, ReplicationRequestType.REPLICATE_FILE); @@ -462,7 +476,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { switch (remoteLog.getLogType()) { case LogType.UPDATE: case LogType.ENTITY_COMMIT: - //if the log partition belongs to a partitions hosted on this node, replicate it + //if the log partition belongs to a partitions hosted on this node, replicated it if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) { logManager.log(remoteLog); } @@ -479,13 +493,21 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { case LogType.FLUSH: //store mapping information for flush logs to use them in incoming LSM components. RemoteLogMapping flushLogMap = new RemoteLogMapping(); + LogRecord flushLog = new LogRecord(); + TransactionUtil.formFlushLogRecord(flushLog, remoteLog.getDatasetId(), null, + remoteLog.getNodeId(), remoteLog.getNumOfFlushedIndexes()); + flushLog.setReplicationThread(this); + flushLog.setLogSource(LogSource.REMOTE); flushLogMap.setRemoteNodeID(remoteLog.getNodeId()); flushLogMap.setRemoteLSN(remoteLog.getLSN()); - logManager.log(remoteLog); - //the log LSN value is updated by logManager.log(.) to a local value - flushLogMap.setLocalLSN(remoteLog.getLSN()); - flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes()); - replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap); + synchronized (localLsn2RemoteMapping) { + logManager.log(flushLog); + //the log LSN value is updated by logManager.log(.) to a local value + flushLogMap.setLocalLSN(flushLog.getLSN()); + flushLogMap.numOfFlushedIndexes.set(flushLog.getNumOfFlushedIndexes()); + replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap); + localLsn2RemoteMapping.put(flushLog.getLSN(), flushLogMap); + } synchronized (flushLogslock) { flushLogslock.notify(); } @@ -497,18 +519,55 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { } /** - * this method is called sequentially by LogPage (notifyReplicationTerminator) - * for JOB_COMMIT and JOB_ABORT log types. + * this method is called sequentially by {@link LogBuffer#notifyReplicationTermination()} + * for JOB_COMMIT, JOB_ABORT, and FLUSH log types. */ @Override public void notifyLogReplicationRequester(LogRecord logRecord) { - pendingNotificationRemoteLogsQ.offer(logRecord); + switch (logRecord.getLogType()) { + case LogType.JOB_COMMIT: + case LogType.ABORT: + pendingNotificationRemoteLogsQ.offer(logRecord); + break; + case LogType.FLUSH: + final RemoteLogMapping remoteLogMapping; + synchronized (localLsn2RemoteMapping) { + remoteLogMapping = localLsn2RemoteMapping.remove(logRecord.getLSN()); + } + checkpointReplicaIndexes(remoteLogMapping, logRecord.getDatasetId()); + break; + default: + throw new IllegalStateException("Unexpected log type: " + logRecord.getLogType()); + } } @Override public SocketChannel getReplicationClientSocket() { return socketChannel; } + + private void checkpointReplicaIndexes(RemoteLogMapping remoteLogMapping, int datasetId) { + try { + Predicate replicaIndexesPredicate = lr -> { + DatasetLocalResource dls = (DatasetLocalResource) lr.getResource(); + return dls.getDatasetId() == datasetId && !localResourceRep.getActivePartitions() + .contains(dls.getPartition()); + }; + final Map resources = localResourceRep.getResources(replicaIndexesPredicate); + final List replicaIndexesRef = + resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); + for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) { + final IIndexCheckpointManager indexCheckpointManager = + indexCheckpointManagerProvider.get(replicaIndexRef); + synchronized (indexCheckpointManager) { + indexCheckpointManager + .masterFlush(remoteLogMapping.getRemoteLSN(), remoteLogMapping.getLocalLSN()); + } + } + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Failed to checkpoint replica indexes", e); + } + } } /** @@ -541,7 +600,6 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { * the received LSM components to a local LSN. */ private class LSMComponentsSyncService extends Thread { - private static final int BULKLOAD_LSN = 0; @Override public void run() { @@ -560,90 +618,22 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { LOGGER.log(Level.SEVERE, "Unexpected exception during LSN synchronization", e); } } - } } private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, LSMComponentLSNSyncTask syncTask) throws InterruptedException, IOException { - long remoteLSN = lsmCompProp.getOriginalLSN(); - //LSN=0 (bulkload) does not need to be updated and there is no flush log corresponding to it - if (remoteLSN == BULKLOAD_LSN) { - //since this is the first LSM component of this index, - //then set the mapping in the LSN_MAP to the current log LSN because - //no other log could've been received for this index since bulkload replication is synchronous. - lsmCompProp.setReplicaLSN(logManager.getAppendLSN()); - return; - } - - //path to the LSM component file - Path path = Paths.get(syncTask.getComponentFilePath()); - if (lsmCompProp.getReplicaLSN() == null) { - if (lsmCompProp.getOpType() == LSMOperationType.FLUSH) { - //need to look up LSN mapping from memory - RemoteLogMapping remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()); - //wait until flush log arrives, and verify the LSM component file still exists - //The component file could be deleted if its NC fails. - while (remoteLogMap == null && Files.exists(path)) { - synchronized (flushLogslock) { - flushLogslock.wait(); - } - remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()); - } - - /** - * file has been deleted due to its remote primary replica failure - * before its LSN could've been synchronized. - */ - if (remoteLogMap == null) { - return; - } - lsmCompProp.setReplicaLSN(remoteLogMap.getLocalLSN()); - } else if (lsmCompProp.getOpType() == LSMOperationType.MERGE) { - //need to load the LSN mapping from disk - Map lsmMap = replicaResourcesManager - .getReplicaIndexLSNMap(lsmCompProp.getReplicaComponentPath(replicaResourcesManager)); - Long mappingLSN = lsmMap.get(lsmCompProp.getOriginalLSN()); - if (mappingLSN == null) { - /** - * this shouldn't happen unless this node just recovered and - * the first component it received is a merged component due - * to an on-going merge operation while recovery on the remote - * replica. In this case, we use the current append LSN since - * no new records exist for this index, otherwise they would've - * been flushed. This could be prevented by waiting for any IO - * to finish on the remote replica during recovery. - */ - mappingLSN = logManager.getAppendLSN(); - } - lsmCompProp.setReplicaLSN(mappingLSN); + final String componentFilePath = syncTask.getComponentFilePath(); + final ResourceReference indexRef = ResourceReference.of(componentFilePath); + final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(indexRef); + synchronized (indexCheckpointManager) { + long masterLsn = lsmCompProp.getOriginalLSN(); + // wait until the lsn mapping is flushed to disk + while (!indexCheckpointManager.isFlushed(masterLsn)) { + indexCheckpointManager.wait(); } - } - - if (Files.notExists(path)) { - /** - * This could happen when a merged component arrives and deletes - * the flushed component (which we are trying to update) before - * its flush log arrives since logs and components are received - * on different threads. - */ - return; - } - - File destFile = new File(syncTask.getComponentFilePath()); - //prepare local LSN buffer - ByteBuffer metadataBuffer = ByteBuffer.allocate(Long.BYTES); - metadataBuffer.putLong(lsmCompProp.getReplicaLSN()); - metadataBuffer.flip(); - - //replace the remote LSN value by the local one - try (RandomAccessFile fileOutputStream = new RandomAccessFile(destFile, "rw"); - FileChannel fileChannel = fileOutputStream.getChannel()) { - long lsnStartOffset = syncTask.getLSNByteOffset(); - while (metadataBuffer.hasRemaining()) { - lsnStartOffset += fileChannel.write(metadataBuffer, lsnStartOffset); - } - fileChannel.force(true); + indexCheckpointManager + .replicated(AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName()), masterLsn); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java index 48c7083..5cf7eab 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java @@ -54,7 +54,6 @@ import java.util.stream.Collectors; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ReplicationProperties; -import org.apache.asterix.common.dataflow.LSMIndexUtil; import org.apache.asterix.common.replication.IReplicaResourcesManager; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.IReplicationStrategy; @@ -63,6 +62,8 @@ import org.apache.asterix.common.replication.Replica.ReplicaState; import org.apache.asterix.common.replication.ReplicaEvent; import org.apache.asterix.common.replication.ReplicationJob; import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.ILogRecord; @@ -84,7 +85,6 @@ import org.apache.hyracks.api.replication.IReplicationJob; import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType; import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType; import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; import org.apache.hyracks.util.StorageUtil; import org.apache.hyracks.util.StorageUtil.StorageUnit; @@ -136,6 +136,7 @@ public class ReplicationManager implements IReplicationManager { private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES); private final IReplicationStrategy replicationStrategy; private final PersistentLocalResourceRepository localResourceRepo; + private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; //TODO this class needs to be refactored by moving its private classes to separate files //and possibly using MessageBroker to send/receive remote replicas events. @@ -148,6 +149,8 @@ public class ReplicationManager implements IReplicationManager { this.replicaResourcesManager = (ReplicaResourcesManager) remoteResoucesManager; this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider; this.logManager = logManager; + this.indexCheckpointManagerProvider = + asterixAppRuntimeContextProvider.getAppContext().getIndexCheckpointManagerProvider(); localResourceRepo = (PersistentLocalResourceRepository) asterixAppRuntimeContextProvider.getLocalResourceRepository(); this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3); @@ -284,7 +287,6 @@ public class ReplicationManager implements IReplicationManager { if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) { return; } - int jobPartitionId = indexFileRef.getPartitionId(); ByteBuffer responseBuffer = null; @@ -327,25 +329,10 @@ public class ReplicationManager implements IReplicationManager { FileChannel fileChannel = fromFile.getChannel();) { long fileSize = fileChannel.size(); - - if (LSMComponentJob != null) { - /** - * since this is LSM_COMPONENT REPLICATE job, the job will contain - * only the component being replicated. - */ - ILSMDiskComponent diskComponent = LSMComponentJob.getLSMIndexOperationContext() - .getComponentsToBeReplicated().get(0); - long lsnOffset = LSMIndexUtil.getComponentFileLSNOffset(LSMComponentJob.getLSMIndex(), - diskComponent, filePath); - asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile, - lsnOffset, remainingFiles == 0); - } else { - asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile, -1L, - remainingFiles == 0); - } + asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile, + remainingFiles == 0); requestBuffer = ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties, ReplicationRequestType.REPLICATE_FILE); - Iterator> iterator = replicasSockets.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); @@ -378,8 +365,7 @@ public class ReplicationManager implements IReplicationManager { } else if (job.getOperation() == ReplicationOperation.DELETE) { for (String filePath : job.getJobFiles()) { remainingFiles--; - asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile, -1L, - remainingFiles == 0); + asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile, remainingFiles == 0); ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties, ReplicationRequestType.DELETE_FILE); @@ -1026,10 +1012,10 @@ public class ReplicationManager implements IReplicationManager { ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); for (String replicaId : replicaIds) { //1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN. - Map laggingIndexes = + Map laggingIndexes = replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId, nonSharpCheckpointTargetLSN); - if (laggingIndexes.size() > 0) { + if (!laggingIndexes.isEmpty()) { //2. send request to remote replicas that have lagging indexes. ReplicaIndexFlushRequest laggingIndexesResponse = null; try (SocketChannel socketChannel = getReplicaSocket(replicaId)) { @@ -1052,16 +1038,14 @@ public class ReplicationManager implements IReplicationManager { } /* - * 4. update the LSN_MAP for indexes that were not flushed + * 4. update checkpoints for indexes that were not flushed * to the current append LSN to indicate no operations happened * since the checkpoint start. */ if (laggingIndexesResponse != null) { for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) { - String indexPath = laggingIndexes.get(resouceId); - Map indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath); - indexLSNMap.put(ReplicaResourcesManager.REPLICA_INDEX_CREATION_LSN, startLSN); - replicaResourcesManager.updateReplicaIndexLSNMap(indexPath, indexLSNMap); + final DatasetResourceReference datasetResourceReference = laggingIndexes.get(resouceId); + indexCheckpointManagerProvider.get(datasetResourceReference).advanceLowWatermark(startLSN); } } } @@ -1141,12 +1125,11 @@ public class ReplicationManager implements IReplicationManager { fileChannel.force(true); } - //we need to create LSN map for .metadata files that belong to remote replicas + //we need to create initial map for .metadata files that belong to remote replicas if (!fileProperties.isLSMComponentFile() && !fileProperties.getNodeId().equals(nodeId)) { - //replica index - replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN()); + final ResourceReference indexRef = ResourceReference.of(destFile.getAbsolutePath()); + indexCheckpointManagerProvider.get(indexRef).init(logManager.getAppendLSN()); } - responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java index f11adc2..08c0ec7 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java @@ -19,33 +19,21 @@ package org.apache.asterix.replication.storage; public class LSMComponentLSNSyncTask { + private String componentFilePath; private String componentId; - private long LSNByteOffset; - public LSMComponentLSNSyncTask(String componentId, String componentFilePath, long LSNByteOffset) { + public LSMComponentLSNSyncTask(String componentId, String componentFilePath) { this.componentId = componentId; this.componentFilePath = componentFilePath; - this.LSNByteOffset = LSNByteOffset; } public String getComponentFilePath() { return componentFilePath; } - public void setComponentFilePath(String componentFilePath) { - this.componentFilePath = componentFilePath; - } - public String getComponentId() { return componentId; } - public void setComponentId(String componentId) { - this.componentId = componentId; - } - - public long getLSNByteOffset() { - return LSNByteOffset; - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java index 7ca6f2f..bf987d0 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java @@ -52,9 +52,10 @@ public class LSMComponentProperties { this.nodeId = nodeId; componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0]); numberOfFiles = new AtomicInteger(job.getJobFiles().size()); - originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), - job.getLSMIndexOperationContext()); opType = job.getLSMOpType(); + originalLSN = opType == LSMOperationType.FLUSH ? + LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), + job.getLSMIndexOperationContext()) : 0; } public LSMComponentProperties() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java index f2747fe..2ebf2cb 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java @@ -31,27 +31,25 @@ public class LSMIndexFileProperties { private boolean lsmComponentFile; private String filePath; private boolean requiresAck = false; - private long LSNByteOffset; public LSMIndexFileProperties() { } public LSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, - long LSNByteOffset, boolean requiresAck) { - initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck); + boolean requiresAck) { + initialize(filePath, fileSize, nodeId, lsmComponentFile, requiresAck); } public LSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) { - initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false, -1L, false); + initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false, false); } - public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset, + public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, boolean requiresAck) { this.filePath = filePath; this.fileSize = fileSize; this.nodeId = nodeId; this.lsmComponentFile = lsmComponentFile; - this.LSNByteOffset = LSNByteOffset; this.requiresAck = requiresAck; } @@ -61,7 +59,6 @@ public class LSMIndexFileProperties { dos.writeUTF(filePath); dos.writeLong(fileSize); dos.writeBoolean(lsmComponentFile); - dos.writeLong(LSNByteOffset); dos.writeBoolean(requiresAck); } @@ -70,10 +67,9 @@ public class LSMIndexFileProperties { String filePath = input.readUTF(); long fileSize = input.readLong(); boolean lsmComponentFile = input.readBoolean(); - long LSNByteOffset = input.readLong(); boolean requiresAck = input.readBoolean(); - LSMIndexFileProperties fileProp = new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile, - LSNByteOffset, requiresAck); + LSMIndexFileProperties fileProp = + new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile, requiresAck); return fileProp; } @@ -108,11 +104,6 @@ public class LSMIndexFileProperties { sb.append("File Size: " + fileSize + " "); sb.append("Node ID: " + nodeId + " "); sb.append("isLSMComponentFile : " + lsmComponentFile + " "); - sb.append("LSN Byte Offset: " + LSNByteOffset); return sb.toString(); } - - public long getLSNByteOffset() { - return LSNByteOffset; - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java index 7eea4a4..2ff74a8 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java @@ -19,27 +19,26 @@ package org.apache.asterix.replication.storage; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; +import java.util.stream.Collectors; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.MetadataProperties; +import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.replication.IReplicaResourcesManager; +import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; @@ -50,16 +49,15 @@ import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.LocalResource; public class ReplicaResourcesManager implements IReplicaResourcesManager { - private static final Logger LOGGER = Logger.getLogger(ReplicaResourcesManager.class.getName()); - public final static String LSM_COMPONENT_MASK_SUFFIX = "_mask"; - private final static String REPLICA_INDEX_LSN_MAP_NAME = ".LSN_MAP"; - public static final long REPLICA_INDEX_CREATION_LSN = -1; + public static final String LSM_COMPONENT_MASK_SUFFIX = "_mask"; private final PersistentLocalResourceRepository localRepository; private final Map nodePartitions; + private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; - public ReplicaResourcesManager(ILocalResourceRepository localRepository, - MetadataProperties metadataProperties) { + public ReplicaResourcesManager(ILocalResourceRepository localRepository, MetadataProperties metadataProperties, + IIndexCheckpointManagerProvider indexCheckpointManagerProvider) { this.localRepository = (PersistentLocalResourceRepository) localRepository; + this.indexCheckpointManagerProvider = indexCheckpointManagerProvider; nodePartitions = metadataProperties.getNodePartitions(); } @@ -86,12 +84,6 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { return indexPath.toString(); } - public void initializeReplicaIndexLSNMap(String indexPath, long currentLSN) throws IOException { - HashMap lsnMap = new HashMap(); - lsnMap.put(REPLICA_INDEX_CREATION_LSN, currentLSN); - updateReplicaIndexLSNMap(indexPath, lsnMap); - } - public void createRemoteLSMComponentMask(LSMComponentProperties lsmComponentProperties) throws IOException { String maskPath = lsmComponentProperties.getMaskPath(this); Path path = Paths.get(maskPath); @@ -106,13 +98,6 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { String maskPath = lsmComponentProperties.getMaskPath(this); Path path = Paths.get(maskPath); Files.deleteIfExists(path); - - //add component LSN to the index LSNs map - Map lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this)); - lsnMap.put(lsmComponentProperties.getOriginalLSN(), lsmComponentProperties.getReplicaLSN()); - - //update map on disk - updateReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this), lsnMap); } public Set getReplicaIndexes(String replicaId) throws HyracksDataException { @@ -128,58 +113,37 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { public long getPartitionsMinLSN(Set partitions) throws HyracksDataException { long minRemoteLSN = Long.MAX_VALUE; for (Integer partition : partitions) { - //for every index in replica - Set remoteIndexes = localRepository.getPartitionIndexes(partition); - for (File indexFolder : remoteIndexes) { - //read LSN map - try { - //get max LSN per index - long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder); - - //get min of all maximums - minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN); - } catch (IOException e) { - LOGGER.log(Level.INFO, - indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder); - continue; - } + final List partitionResources = localRepository.getResources(resource -> { + DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); + return dsResource.getPartition() == partition; + }).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); + for (DatasetResourceReference indexRef : partitionResources) { + long remoteIndexMaxLSN = indexCheckpointManagerProvider.get(indexRef).getLowWatermark(); + minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN); } } return minRemoteLSN; } - public Map getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN) throws IOException { - Map laggingReplicaIndexes = new HashMap(); - try { - //for every index in replica - Set remoteIndexes = getReplicaIndexes(replicaId); - for (File indexFolder : remoteIndexes) { - if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) { - File localResource = new File( - indexFolder + File.separator + StorageConstants.METADATA_FILE_NAME); - LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource); - laggingReplicaIndexes.put(resource.getId(), indexFolder.getAbsolutePath()); + public Map getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN) + throws HyracksDataException { + Map laggingReplicaIndexes = new HashMap<>(); + final List replicaPartitions = + Arrays.stream(nodePartitions.get(replicaId)).map(ClusterPartition::getPartitionId) + .collect(Collectors.toList()); + for (int patition : replicaPartitions) { + final Map partitionResources = localRepository.getPartitionResources(patition); + final List indexesRefs = + partitionResources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); + for (DatasetResourceReference ref : indexesRefs) { + if (indexCheckpointManagerProvider.get(ref).getLowWatermark() < targetLSN) { + laggingReplicaIndexes.put(ref.getResourceId(), ref); } } - } catch (HyracksDataException e) { - e.printStackTrace(); } - return laggingReplicaIndexes; } - private long getReplicaIndexMaxLSN(File indexFolder) throws IOException { - long remoteIndexMaxLSN = 0; - //get max LSN per index - Map lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath()); - if (lsnMap != null) { - for (Long lsn : lsnMap.values()) { - remoteIndexMaxLSN = Math.max(remoteIndexMaxLSN, lsn); - } - } - return remoteIndexMaxLSN; - } - public void cleanInvalidLSMComponents(String replicaId) { //for every index in replica Set remoteIndexes = null; @@ -214,28 +178,6 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { } } - @SuppressWarnings({ "unchecked" }) - public synchronized Map getReplicaIndexLSNMap(String indexPath) throws IOException { - try (FileInputStream fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME); - ObjectInputStream oisFromFis = new ObjectInputStream(fis)) { - Map lsnMap = null; - try { - lsnMap = (Map) oisFromFis.readObject(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } - return lsnMap; - } - } - - public synchronized void updateReplicaIndexLSNMap(String indexPath, Map lsnMap) throws IOException { - try (FileOutputStream fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME); - ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) { - oosToFos.writeObject(lsnMap); - oosToFos.flush(); - } - } - /** * @param partition * @return Absolute paths to all partition files http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index e87a39b..ba43fb7 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -29,6 +29,7 @@ import java.io.ObjectOutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -38,8 +39,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.MetadataProperties; @@ -48,10 +47,12 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.ReplicationJob; import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.IOFileFilter; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; @@ -72,7 +73,29 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito public static final Predicate INDEX_COMPONENTS = path -> !path.endsWith(StorageConstants.METADATA_FILE_NAME); // Private constants private static final int MAX_CACHED_RESOURCES = 1000; - private static final int RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT = 6; + private static final IOFileFilter METADATA_FILES_FILTER = new IOFileFilter() { + @Override + public boolean accept(File file) { + return file.getName().equals(StorageConstants.METADATA_FILE_NAME); + } + + @Override + public boolean accept(File dir, String name) { + return false; + } + }; + + private static final IOFileFilter ALL_DIR_FILTER = new IOFileFilter() { + @Override + public boolean accept(File file) { + return true; + } + + @Override + public boolean accept(File dir, String name) { + return true; + } + }; // Finals private final IIOManager ioManager; @@ -85,15 +108,17 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito private IReplicationManager replicationManager; private Set nodeInactivePartitions; private final Path[] storageRoots; + private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; - public PersistentLocalResourceRepository(IIOManager ioManager, String nodeId, - MetadataProperties metadataProperties) { + public PersistentLocalResourceRepository(IIOManager ioManager, String nodeId, MetadataProperties metadataProperties, + IIndexCheckpointManagerProvider indexCheckpointManagerProvider) { this.ioManager = ioManager; + this.indexCheckpointManagerProvider = indexCheckpointManagerProvider; storageRoots = new Path[ioManager.getIODevices().size()]; final List ioDevices = ioManager.getIODevices(); for (int i = 0; i < ioDevices.size(); i++) { - storageRoots[i] = - Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), StorageConstants.STORAGE_ROOT_DIR_NAME); + storageRoots[i] = Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), + StorageConstants.STORAGE_ROOT_DIR_NAME); } createStorageRoots(); resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build(); @@ -119,7 +144,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } @Override - public LocalResource get(String relativePath) throws HyracksDataException { + public synchronized LocalResource get(String relativePath) throws HyracksDataException { LocalResource resource = resourceCache.getIfPresent(relativePath); if (resource == null) { FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath); @@ -153,7 +178,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } resourceCache.put(resource.getPath(), resource); - + indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(0); //if replication enabled, send resource metadata info to remote nodes if (isReplicationEnabled) { createReplicationJob(ReplicationOperation.REPLICATE, resourceFile); @@ -164,18 +189,16 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito public synchronized void delete(String relativePath) throws HyracksDataException { FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath); if (resourceFile.getFile().exists()) { - try { - // Invalidate before deleting the file just in case file deletion throws some exception. - // Since it's just a cache invalidation, it should not affect correctness. - resourceCache.invalidate(relativePath); - IoUtil.delete(resourceFile); - } finally { - // Regardless of successfully deleted or not, the operation should be replicated. - //if replication enabled, delete resource from remote replicas - if (isReplicationEnabled) { - createReplicationJob(ReplicationOperation.DELETE, resourceFile); - } + if (isReplicationEnabled) { + createReplicationJob(ReplicationOperation.DELETE, resourceFile); } + // delete all checkpoints + final LocalResource localResource = readLocalResource(resourceFile.getFile()); + indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete(); + // Invalidate before deleting the file just in case file deletion throws some exception. + // Since it's just a cache invalidation, it should not affect correctness. + resourceCache.invalidate(relativePath); + IoUtil.delete(resourceFile); } else { throw HyracksDataException .create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, relativePath); @@ -188,13 +211,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return ioManager.resolve(fileName); } - public Map getResources(Predicate filter) throws HyracksDataException { + public synchronized Map getResources(Predicate filter) + throws HyracksDataException { Map resourcesMap = new HashMap<>(); for (Path root : storageRoots) { - try (Stream stream = Files.find(root, RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT, - (path, attr) -> path.getFileName().toString().equals(StorageConstants.METADATA_FILE_NAME))) { - final List resourceMetadataFiles = stream.map(Path::toFile).collect(Collectors.toList()); - for (File file : resourceMetadataFiles) { + final Collection files = FileUtils.listFiles(root.toFile(), METADATA_FILES_FILTER, ALL_DIR_FILTER); + try { + for (File file : files) { final LocalResource localResource = PersistentLocalResourceRepository.readLocalResource(file); if (filter.test(localResource)) { resourcesMap.put(localResource.getId(), localResource); @@ -321,6 +344,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return indexes; } + public Map getPartitionResources(int partition) throws HyracksDataException { + return getResources(resource -> { + DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); + return dsResource.getPartition() == partition; + }); + } + /** * Given any index file, an absolute {@link FileReference} is returned which points to where the index of * {@code indexFile} is located. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java index 43024b6..33c6260 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.transaction.management.resource; import org.apache.asterix.common.config.MetadataProperties; +import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.common.ILocalResourceRepository; @@ -28,16 +29,19 @@ public class PersistentLocalResourceRepositoryFactory implements ILocalResourceR private final IIOManager ioManager; private final String nodeId; private final MetadataProperties metadataProperties; + private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId, - MetadataProperties metadataProperties) { + MetadataProperties metadataProperties, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) { this.ioManager = ioManager; this.nodeId = nodeId; this.metadataProperties = metadataProperties; + this.indexCheckpointManagerProvider = indexCheckpointManagerProvider; } @Override public ILocalResourceRepository createRepository() throws HyracksDataException { - return new PersistentLocalResourceRepository(ioManager, nodeId, metadataProperties); + return new PersistentLocalResourceRepository(ioManager, nodeId, metadataProperties, + indexCheckpointManagerProvider); } }