asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [8/8] asterixdb git commit: Improve reading from and writing to Metadata pages
Date Fri, 03 Feb 2017 15:57:20 GMT
Improve reading from and writing to Metadata pages

This change introduces a new interface IComponentMetadata.
Each LSM component is associated with a metadata object
which can be used to read and write arbitrary data to
the metadata pages of components. When flushing a
component, data in its metadata component is automatically
flushed to the disk component. For merge operations,
the IO Callback is responsible for merging the components'
metadata pages.

Change-Id: Id95ef33c0a0bc1abb3fc3ecdea5611ee4acd6dfa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1476
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
Integration-Tests: Ian Maxon <imaxon@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/d718dc4a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/d718dc4a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/d718dc4a

Branch: refs/heads/master
Commit: d718dc4a7fbfea5b50850f82f3f4abc01bc0e841
Parents: a8d961d
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Thu Feb 2 21:41:48 2017 -0800
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Fri Feb 3 07:56:03 2017 -0800

----------------------------------------------------------------------
 .../api/http/servlet/ConnectorApiLetTest.java   | 180 ----------------
 .../http/servlet/ConnectorApiServletTest.java   | 180 ++++++++++++++++
 .../api/http/servlet/VersionApiLetTest.java     | 120 -----------
 .../api/http/servlet/VersionApiServletTest.java | 120 +++++++++++
 .../asterix/test/dataflow/LogMarkerTest.java    |  10 +-
 .../context/CorrelatedPrefixMergePolicy.java    |  22 +-
 .../context/PrimaryIndexOperationTracker.java   |   4 +-
 .../asterix/common/dataflow/LSMIndexUtil.java   |  13 +-
 .../common/exceptions/RuntimeDataException.java |   1 +
 .../AbstractLSMIOOperationCallback.java         |  18 +-
 .../LSMBTreeIOOperationCallback.java            |  25 ++-
 .../LSMBTreeIOOperationCallbackFactory.java     |   2 +-
 .../LSMBTreeWithBuddyIOOperationCallback.java   |  15 +-
 ...TreeWithBuddyIOOperationCallbackFactory.java |   2 +-
 .../LSMInvertedIndexIOOperationCallback.java    |  18 +-
 ...InvertedIndexIOOperationCallbackFactory.java |   2 +-
 .../LSMRTreeIOOperationCallback.java            |  15 +-
 .../LSMRTreeIOOperationCallbackFactory.java     |   2 +-
 .../PrimaryIndexLogMarkerCallback.java          |  83 ++++++-
 .../indexing/ExternalFileIndexAccessor.java     |  26 +--
 asterixdb/asterix-lang-sqlpp/pom.xml            |  18 +-
 .../pom.xml                                     |  10 +-
 .../metadata/bootstrap/MetadataBootstrap.java   |   8 +-
 .../management/ReplicationManager.java          |  50 ++---
 .../ExternalBTreeLocalResourceMetadata.java     |  15 +-
 ...rnalBTreeWithBuddyLocalResourceMetadata.java |  20 +-
 .../ExternalRTreeLocalResourceMetadata.java     |  11 +-
 .../resource/LSMBTreeLocalResourceMetadata.java |  22 +-
 .../LSMInvertedIndexLocalResourceMetadata.java  |   4 +-
 .../resource/LSMRTreeLocalResourceMetadata.java |   4 +-
 .../hyracks/algebricks/common/utils/Pair.java   |  12 +-
 .../hyracks/algebricks/common/utils/Triple.java |  11 +-
 .../physical/AbstractStableSortPOperator.java   |   1 -
 .../physical/OneToOneExchangePOperator.java     |   2 +-
 .../physical/RandomMergeExchangePOperator.java  |   2 +-
 .../algebricks/algebricks-rewriter/pom.xml      |   7 +-
 .../hyracks/api/exceptions/ErrorCode.java       |   1 +
 .../src/main/resources/errormsg/en.properties   |  21 +-
 .../hyracks/data/std/api/IValueReference.java   |   6 +-
 .../data/std/primitive/LongPointable.java       |  21 +-
 .../data/std/util/ArrayBackedValueStorage.java  |  20 ++
 .../hyracks/data/std/util/GrowableArray.java    |  29 +++
 .../tests/integration/JobFailureTest.java       |   9 +-
 ...onOnCreatePushRuntimeOperatorDescriptor.java |   5 +
 .../http/server/ChunkedNettyOutputStream.java   |  17 +-
 .../am/btree/dataflow/BTreeDataflowHelper.java  |  18 +-
 .../storage/am/btree/util/BTreeUtils.java       |  10 +-
 .../AppendOnlyLinkedMetadataPageManager.java    |  17 +-
 .../dataflow/ExternalBTreeDataflowHelper.java   |   2 +-
 .../ExternalBTreeWithBuddyDataflowHelper.java   |   2 +-
 .../btree/dataflow/LSMBTreeDataflowHelper.java  |   2 +-
 .../am/lsm/btree/impls/ExternalBTree.java       | 104 ++++-----
 .../lsm/btree/impls/ExternalBTreeOpContext.java |  15 +-
 .../lsm/btree/impls/ExternalBTreeWithBuddy.java | 118 +++++-----
 .../impls/ExternalBTreeWithBuddyOpContext.java  |  15 +-
 .../storage/am/lsm/btree/impls/LSMBTree.java    | 166 +++++++-------
 .../lsm/btree/impls/LSMBTreeDiskComponent.java  |  12 +-
 .../impls/LSMBTreeDiskComponentFactory.java     |  17 +-
 .../lsm/btree/impls/LSMBTreeFlushOperation.java |  18 +-
 .../btree/impls/LSMBTreeMemoryComponent.java    |  16 +-
 .../lsm/btree/impls/LSMBTreeMergeOperation.java |  10 +-
 .../am/lsm/btree/impls/LSMBTreeOpContext.java   |  22 +-
 .../impls/LSMBTreeWithBuddyDiskComponent.java   |  12 +-
 .../LSMBTreeWithBuddyDiskComponentFactory.java  |  34 +--
 .../impls/LSMBTreeWithBuddyMemoryComponent.java |  14 +-
 .../impls/LSMBTreeWithBuddyMergeOperation.java  |  24 +--
 .../hyracks-storage-am-lsm-common/pom.xml       |   4 +
 .../am/lsm/common/api/IComponentMetadata.java   |  54 +++++
 .../am/lsm/common/api/ILSMComponent.java        |  87 +++++++-
 .../am/lsm/common/api/ILSMComponentFactory.java |  31 ---
 .../am/lsm/common/api/ILSMComponentFilter.java  |  13 +-
 .../common/api/ILSMComponentFilterFactory.java  |   3 +-
 .../api/ILSMComponentFilterFrameFactory.java    |   3 +-
 .../common/api/ILSMComponentFilterManager.java  |   8 +-
 .../am/lsm/common/api/ILSMDiskComponent.java    |  49 +++++
 .../common/api/ILSMDiskComponentFactory.java    |  36 ++++
 .../storage/am/lsm/common/api/ILSMHarness.java  |  30 ++-
 .../am/lsm/common/api/ILSMIOOperation.java      |  15 +-
 .../lsm/common/api/ILSMIOOperationCallback.java |  13 +-
 .../api/ILSMIOOperationCallbackFactory.java     |   2 +-
 .../api/ILSMIOOperationCallbackProvider.java    |   3 +-
 .../storage/am/lsm/common/api/ILSMIndex.java    |  92 +++++++-
 .../am/lsm/common/api/ILSMIndexAccessor.java    |  41 ++--
 .../common/api/ILSMIndexAccessorInternal.java   |  42 ----
 .../am/lsm/common/api/ILSMIndexInternal.java    |  98 ---------
 .../common/api/ILSMIndexOperationContext.java   |  20 +-
 .../am/lsm/common/api/ILSMMemoryComponent.java  |  77 +++++++
 .../am/lsm/common/api/ILSMMergePolicy.java      |   7 +-
 .../lsm/common/api/ILSMMergePolicyFactory.java  |   8 +-
 .../am/lsm/common/api/ILSMOperationTracker.java |   8 +-
 .../storage/am/lsm/common/api/ITwoPCIndex.java  |  10 +-
 .../am/lsm/common/api/IVirtualBufferCache.java  |   8 +-
 .../common/api/IVirtualBufferCacheProvider.java |   5 +-
 .../common/impls/AbstractDiskLSMComponent.java  | 109 ----------
 .../lsm/common/impls/AbstractLSMComponent.java  |  33 +--
 .../common/impls/AbstractLSMDiskComponent.java  |  99 +++++++++
 .../am/lsm/common/impls/AbstractLSMIndex.java   |  66 +++---
 .../impls/AbstractLSMMemoryComponent.java       | 210 ++++++++++++++++++
 .../impls/AbstractMemoryLSMComponent.java       | 214 -------------------
 .../am/lsm/common/impls/BTreeFactory.java       |   3 +-
 .../BlockingIOOperationCallbackWrapper.java     |   8 +-
 .../lsm/common/impls/ConstantMergePolicy.java   |  21 +-
 .../lsm/common/impls/DiskComponentMetadata.java |  56 +++++
 .../lsm/common/impls/ExternalIndexHarness.java  |  39 ++--
 .../am/lsm/common/impls/IndexFactory.java       |   3 +-
 .../common/impls/LSMComponentFilterFactory.java |   2 +-
 .../common/impls/LSMComponentFilterManager.java |   8 +-
 .../storage/am/lsm/common/impls/LSMHarness.java |  57 +++--
 .../lsm/common/impls/LSMTreeIndexAccessor.java  |  10 +-
 .../common/impls/MemoryComponentMetadata.java   |  87 ++++++++
 .../common/impls/NoOpIOOperationCallback.java   |  12 +-
 .../am/lsm/common/impls/PrefixMergePolicy.java  |  27 +--
 .../lsm/common/impls/ThreadCountingTracker.java |  13 +-
 .../lsm/common/utils/ComponentMetadataUtil.java | 118 ++++++++++
 .../pom.xml                                     |   7 +-
 .../lsm/invertedindex/api/IInvertedIndex.java   |  12 +-
 .../LSMInvertedIndexDataflowHelper.java         |   2 +-
 ...rtitionedLSMInvertedIndexDataflowHelper.java |   2 +-
 .../invertedindex/impls/LSMInvertedIndex.java   | 165 +++++++-------
 .../impls/LSMInvertedIndexAccessor.java         |  14 +-
 .../impls/LSMInvertedIndexDiskComponent.java    |   9 +-
 .../LSMInvertedIndexDiskComponentFactory.java   |  22 +-
 .../impls/LSMInvertedIndexFlushOperation.java   |   8 +-
 .../impls/LSMInvertedIndexMemoryComponent.java  |   6 +-
 .../impls/LSMInvertedIndexMergeOperation.java   |  10 +-
 .../impls/LSMInvertedIndexOpContext.java        |  22 +-
 .../inmemory/InMemoryInvertedIndex.java         |   7 +-
 .../PartitionedInMemoryInvertedIndex.java       |  16 +-
 .../ondisk/OnDiskInvertedIndex.java             |  51 +++--
 .../ondisk/OnDiskInvertedIndexFactory.java      |   3 +-
 .../ondisk/PartitionedOnDiskInvertedIndex.java  |   6 +-
 .../PartitionedOnDiskInvertedIndexFactory.java  |  12 +-
 .../invertedindex/util/InvertedIndexUtils.java  |  89 ++++----
 .../dataflow/ExternalRTreeDataflowHelper.java   |   2 +-
 .../rtree/dataflow/LSMRTreeDataflowHelper.java  |   2 +-
 ...RTreeWithAntiMatterTuplesDataflowHelper.java |   2 +-
 .../am/lsm/rtree/impls/AbstractLSMRTree.java    |  87 ++++----
 .../am/lsm/rtree/impls/ExternalRTree.java       | 105 ++++-----
 .../lsm/rtree/impls/ExternalRTreeOpContext.java |  21 +-
 .../storage/am/lsm/rtree/impls/LSMRTree.java    | 135 ++++++------
 .../lsm/rtree/impls/LSMRTreeDiskComponent.java  |   7 +-
 .../impls/LSMRTreeDiskComponentFactory.java     |  17 +-
 .../lsm/rtree/impls/LSMRTreeFlushOperation.java |   8 +-
 .../rtree/impls/LSMRTreeMemoryComponent.java    |   6 +-
 .../lsm/rtree/impls/LSMRTreeMergeOperation.java |  10 +-
 .../am/lsm/rtree/impls/LSMRTreeOpContext.java   |  25 +--
 .../impls/LSMRTreeWithAntiMatterTuples.java     |  76 +++----
 ...ithAntiMatterTuplesDiskComponentFactory.java |  17 +-
 .../am/lsm/rtree/impls/RTreeFactory.java        |   3 +-
 .../common/buffercache/IBufferCache.java        |  42 ++--
 .../storage/am/btree/BTreeExamplesTest.java     |   7 +-
 .../btree/multithread/BTreeMultiThreadTest.java |  19 +-
 .../storage/am/lsm/common/DummyTreeFactory.java |   3 +-
 153 files changed, 2677 insertions(+), 2174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java
deleted file mode 100644
index d9a0a79..0000000
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.api.http.servlet;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintWriter;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.asterix.api.http.server.ConnectorApiServlet;
-import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.utils.JSONDeserializerForTypes;
-import org.apache.asterix.test.runtime.SqlppExecutionTest;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.io.ManagedFileSplit;
-import org.apache.hyracks.http.api.IServletRequest;
-import org.apache.hyracks.http.api.IServletResponse;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.HttpMethod;
-import junit.extensions.PA;
-
-public class ConnectorApiLetTest {
-
-    @Test
-    public void testGet() throws Exception {
-        // Starts test asterixdb cluster.
-        SqlppExecutionTest.setUp();
-
-        // Configures a test connector api servlet.
-        ConnectorApiServlet let = new ConnectorApiServlet(new ConcurrentHashMap<>(), new String[] { "/" });
-        Map<String, NodeControllerInfo> nodeMap = new HashMap<>();
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        PrintWriter outputWriter = new PrintWriter(outputStream);
-
-        // Creates mocks.
-        IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
-        NodeControllerInfo mockInfo1 = mock(NodeControllerInfo.class);
-        NodeControllerInfo mockInfo2 = mock(NodeControllerInfo.class);
-        IServletRequest mockRequest = mock(IServletRequest.class);
-        IServletResponse mockResponse = mock(IServletResponse.class);
-        FullHttpRequest mockHttpRequest = mock(FullHttpRequest.class);
-
-        // Put stuff in let map
-        let.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc);
-        // Sets up mock returns.
-        when(mockRequest.getHttpRequest()).thenReturn(mockHttpRequest);
-        when(mockHttpRequest.method()).thenReturn(HttpMethod.GET);
-        when(mockRequest.getParameter("dataverseName")).thenReturn("Metadata");
-        when(mockRequest.getParameter("datasetName")).thenReturn("Dataset");
-        when(mockResponse.writer()).thenReturn(outputWriter);
-        when(mockHcc.getNodeControllerInfos()).thenReturn(nodeMap);
-        when(mockInfo1.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.1", 3099));
-        when(mockInfo2.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.2", 3099));
-
-        // Calls ConnectorAPIServlet.formResponseObject.
-        nodeMap.put("asterix_nc1", mockInfo1);
-        nodeMap.put("asterix_nc2", mockInfo2);
-        let.handle(mockRequest, mockResponse);
-
-        // Constructs the actual response.
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode actualResponse = (ObjectNode) om.readTree(outputStream.toString());
-
-        // Checks the temp-or-not, primary key, data type of the dataset.
-        boolean temp = actualResponse.get("temp").asBoolean();
-        Assert.assertFalse(temp);
-        String primaryKey = actualResponse.get("keys").asText();
-        Assert.assertEquals("DataverseName,DatasetName", primaryKey);
-        ARecordType recordType = (ARecordType) JSONDeserializerForTypes.convertFromJSON(actualResponse.get("type"));
-        Assert.assertEquals(getMetadataRecordType("Metadata", "Dataset"), recordType);
-
-        // Checks the correctness of results.
-        ArrayNode splits = (ArrayNode) actualResponse.get("splits");
-        String path = (splits.get(0)).get("path").asText();
-        Assert.assertTrue(path.endsWith("Metadata/Dataset_idx_Dataset"));
-
-        // Tears down the asterixdb cluster.
-        SqlppExecutionTest.tearDown();
-    }
-
-    @Test
-    public void testFormResponseObject() throws Exception {
-        ConnectorApiServlet let = new ConnectorApiServlet(new ConcurrentHashMap<>(), new String[] { "/" });
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode actualResponse = om.createObjectNode();
-        FileSplit[] splits = new FileSplit[2];
-        splits[0] = new ManagedFileSplit("asterix_nc1", "foo1");
-        splits[1] = new ManagedFileSplit("asterix_nc2", "foo2");
-        Map<String, NodeControllerInfo> nodeMap = new HashMap<>();
-        NodeControllerInfo mockInfo1 = mock(NodeControllerInfo.class);
-        NodeControllerInfo mockInfo2 = mock(NodeControllerInfo.class);
-
-        // Sets up mock returns.
-        when(mockInfo1.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.1", 3099));
-        when(mockInfo2.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.2", 3099));
-
-        String[] fieldNames = new String[] { "a1", "a2" };
-        IAType[] fieldTypes = new IAType[] { BuiltinType.ABOOLEAN, BuiltinType.ADAYTIMEDURATION };
-        ARecordType recordType = new ARecordType("record", fieldNames, fieldTypes, true);
-        String primaryKey = "a1";
-
-        // Calls ConnectorAPIServlet.formResponseObject.
-        nodeMap.put("asterix_nc1", mockInfo1);
-        nodeMap.put("asterix_nc2", mockInfo2);
-        PA.invokeMethod(let,
-                "formResponseObject(" + ObjectNode.class.getName() + ", " + FileSplit.class.getName() + "[], "
-                        + ARecordType.class.getName() + ", " + String.class.getName() + ", boolean, "
-                        + Map.class.getName() + ")",
-                actualResponse, splits, recordType, primaryKey, true, nodeMap);
-        // Constructs expected response.
-        ObjectNode expectedResponse = om.createObjectNode();
-        expectedResponse.put("temp", true);
-        expectedResponse.put("keys", primaryKey);
-        expectedResponse.set("type", recordType.toJSON());
-        ArrayNode splitsArray = om.createArrayNode();
-        ObjectNode element1 = om.createObjectNode();
-        element1.put("ip", "127.0.0.1");
-        element1.put("path", splits[0].getPath());
-        ObjectNode element2 = om.createObjectNode();
-        element2.put("ip", "127.0.0.2");
-        element2.put("path", splits[1].getPath());
-        splitsArray.add(element1);
-        splitsArray.add(element2);
-        expectedResponse.set("splits", splitsArray);
-
-        // Checks results.
-        Assert.assertEquals(actualResponse.toString(), expectedResponse.toString());
-    }
-
-    private ARecordType getMetadataRecordType(String dataverseName, String datasetName) throws Exception {
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        // Retrieves file splits of the dataset.
-        MetadataProvider metadataProvider = new MetadataProvider(null, new StorageComponentProvider());
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
-        ARecordType recordType =
-                (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
-        // Metadata transaction commits.
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        return recordType;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
new file mode 100644
index 0000000..7da3b32
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.api.http.servlet;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.api.http.server.ConnectorApiServlet;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.JSONDeserializerForTypes;
+import org.apache.asterix.test.runtime.SqlppExecutionTest;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.io.ManagedFileSplit;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import junit.extensions.PA;
+
+public class ConnectorApiServletTest {
+
+    @Test
+    public void testGet() throws Exception {
+        // Starts test asterixdb cluster.
+        SqlppExecutionTest.setUp();
+
+        // Configures a test connector api servlet.
+        ConnectorApiServlet let = new ConnectorApiServlet(new ConcurrentHashMap<>(), new String[] { "/" });
+        Map<String, NodeControllerInfo> nodeMap = new HashMap<>();
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        PrintWriter outputWriter = new PrintWriter(outputStream);
+
+        // Creates mocks.
+        IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
+        NodeControllerInfo mockInfo1 = mock(NodeControllerInfo.class);
+        NodeControllerInfo mockInfo2 = mock(NodeControllerInfo.class);
+        IServletRequest mockRequest = mock(IServletRequest.class);
+        IServletResponse mockResponse = mock(IServletResponse.class);
+        FullHttpRequest mockHttpRequest = mock(FullHttpRequest.class);
+
+        // Put stuff in let map
+        let.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc);
+        // Sets up mock returns.
+        when(mockRequest.getHttpRequest()).thenReturn(mockHttpRequest);
+        when(mockHttpRequest.method()).thenReturn(HttpMethod.GET);
+        when(mockRequest.getParameter("dataverseName")).thenReturn("Metadata");
+        when(mockRequest.getParameter("datasetName")).thenReturn("Dataset");
+        when(mockResponse.writer()).thenReturn(outputWriter);
+        when(mockHcc.getNodeControllerInfos()).thenReturn(nodeMap);
+        when(mockInfo1.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.1", 3099));
+        when(mockInfo2.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.2", 3099));
+
+        // Calls ConnectorAPIServlet.formResponseObject.
+        nodeMap.put("asterix_nc1", mockInfo1);
+        nodeMap.put("asterix_nc2", mockInfo2);
+        let.handle(mockRequest, mockResponse);
+
+        // Constructs the actual response.
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode actualResponse = (ObjectNode) om.readTree(outputStream.toString());
+
+        // Checks the temp-or-not, primary key, data type of the dataset.
+        boolean temp = actualResponse.get("temp").asBoolean();
+        Assert.assertFalse(temp);
+        String primaryKey = actualResponse.get("keys").asText();
+        Assert.assertEquals("DataverseName,DatasetName", primaryKey);
+        ARecordType recordType = (ARecordType) JSONDeserializerForTypes.convertFromJSON(actualResponse.get("type"));
+        Assert.assertEquals(getMetadataRecordType("Metadata", "Dataset"), recordType);
+
+        // Checks the correctness of results.
+        ArrayNode splits = (ArrayNode) actualResponse.get("splits");
+        String path = (splits.get(0)).get("path").asText();
+        Assert.assertTrue(path.endsWith("Metadata/Dataset_idx_Dataset"));
+
+        // Tears down the asterixdb cluster.
+        SqlppExecutionTest.tearDown();
+    }
+
+    @Test
+    public void testFormResponseObject() throws Exception {
+        ConnectorApiServlet let = new ConnectorApiServlet(new ConcurrentHashMap<>(), new String[] { "/" });
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode actualResponse = om.createObjectNode();
+        FileSplit[] splits = new FileSplit[2];
+        splits[0] = new ManagedFileSplit("asterix_nc1", "foo1");
+        splits[1] = new ManagedFileSplit("asterix_nc2", "foo2");
+        Map<String, NodeControllerInfo> nodeMap = new HashMap<>();
+        NodeControllerInfo mockInfo1 = mock(NodeControllerInfo.class);
+        NodeControllerInfo mockInfo2 = mock(NodeControllerInfo.class);
+
+        // Sets up mock returns.
+        when(mockInfo1.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.1", 3099));
+        when(mockInfo2.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.2", 3099));
+
+        String[] fieldNames = new String[] { "a1", "a2" };
+        IAType[] fieldTypes = new IAType[] { BuiltinType.ABOOLEAN, BuiltinType.ADAYTIMEDURATION };
+        ARecordType recordType = new ARecordType("record", fieldNames, fieldTypes, true);
+        String primaryKey = "a1";
+
+        // Calls ConnectorAPIServlet.formResponseObject.
+        nodeMap.put("asterix_nc1", mockInfo1);
+        nodeMap.put("asterix_nc2", mockInfo2);
+        PA.invokeMethod(let,
+                "formResponseObject(" + ObjectNode.class.getName() + ", " + FileSplit.class.getName() + "[], "
+                        + ARecordType.class.getName() + ", " + String.class.getName() + ", boolean, "
+                        + Map.class.getName() + ")",
+                actualResponse, splits, recordType, primaryKey, true, nodeMap);
+        // Constructs expected response.
+        ObjectNode expectedResponse = om.createObjectNode();
+        expectedResponse.put("temp", true);
+        expectedResponse.put("keys", primaryKey);
+        expectedResponse.set("type", recordType.toJSON());
+        ArrayNode splitsArray = om.createArrayNode();
+        ObjectNode element1 = om.createObjectNode();
+        element1.put("ip", "127.0.0.1");
+        element1.put("path", splits[0].getPath());
+        ObjectNode element2 = om.createObjectNode();
+        element2.put("ip", "127.0.0.2");
+        element2.put("path", splits[1].getPath());
+        splitsArray.add(element1);
+        splitsArray.add(element2);
+        expectedResponse.set("splits", splitsArray);
+
+        // Checks results.
+        Assert.assertEquals(actualResponse.toString(), expectedResponse.toString());
+    }
+
+    private ARecordType getMetadataRecordType(String dataverseName, String datasetName) throws Exception {
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        // Retrieves file splits of the dataset.
+        MetadataProvider metadataProvider = new MetadataProvider(null, new StorageComponentProvider());
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+        ARecordType recordType =
+                (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+        // Metadata transaction commits.
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        return recordType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java
deleted file mode 100644
index 619e7a5..0000000
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.api.http.servlet;
-
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintWriter;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.asterix.api.http.server.VersionApiServlet;
-import org.apache.asterix.common.config.BuildProperties;
-import org.apache.asterix.runtime.utils.AppContextInfo;
-import org.apache.asterix.test.runtime.SqlppExecutionTest;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.http.api.IServletRequest;
-import org.apache.hyracks.http.api.IServletResponse;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.HttpMethod;
-
-public class VersionApiLetTest {
-
-    @Test
-    public void testGet() throws Exception {
-        // Starts test asterixdb cluster.
-        SqlppExecutionTest.setUp();
-
-        // Configures a test version api servlet.
-        VersionApiServlet servlet = new VersionApiServlet(new ConcurrentHashMap<>(), new String[] { "/" });
-        Map<String, String> propMap = new HashMap<>();
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        PrintWriter outputWriter = new PrintWriter(outputStream);
-
-        // Creates mocks.
-        AppContextInfo mockCtx = mock(AppContextInfo.class);
-        IServletRequest mockRequest = mock(IServletRequest.class);
-        IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
-        IServletResponse mockResponse = mock(IServletResponse.class);
-        BuildProperties mockProperties = mock(BuildProperties.class);
-        FullHttpRequest mockHttpRequest = mock(FullHttpRequest.class);
-
-        // Put stuff in let map
-        servlet.ctx().put(HYRACKS_CONNECTION_ATTR, mockHcc);
-        servlet.ctx().put(ASTERIX_BUILD_PROP_ATTR, mockCtx);
-        // Sets up mock returns.
-        when(mockResponse.writer()).thenReturn(outputWriter);
-        when(mockRequest.getHttpRequest()).thenReturn(mockHttpRequest);
-        when(mockHttpRequest.method()).thenReturn(HttpMethod.GET);
-        when(mockCtx.getBuildProperties()).thenReturn(mockProperties);
-        when(mockProperties.getAllProps()).thenReturn(propMap);
-
-        propMap.put("git.build.user.email", "foo@bar.baz");
-        propMap.put("git.build.host", "fulliautomatix");
-        propMap.put("git.dirty", "true");
-        propMap.put("git.remote.origin.url", "git@github.com:apache/incubator-asterixdb.git");
-        propMap.put("git.closest.tag.name", "asterix-0.8.7-incubating");
-        propMap.put("git.commit.id.describe-short", "asterix-0.8.7-incubating-19-dirty");
-        propMap.put("git.commit.user.email", "foo@bar.baz");
-        propMap.put("git.commit.time", "21.10.2015 @ 23:36:41 PDT");
-        propMap.put("git.commit.message.full",
-                "ASTERIXDB-1045: fix log file reading during recovery\n\nChange-Id: Ic83ee1dd2d7ba88180c25f4ec6c7aa8d0a5a7162\nReviewed-on: https://asterix-gerrit.ics.uci.edu/465\nTested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>");
-        propMap.put("git.build.version", "0.8.8-SNAPSHOT");
-        propMap.put("git.commit.message.short", "ASTERIXDB-1045: fix log file reading during recovery");
-        propMap.put("git.commit.id.abbrev", "e1dad19");
-        propMap.put("git.branch", "foo/bar");
-        propMap.put("git.build.user.name", "Asterix");
-        propMap.put("git.closest.tag.commit.count", "19");
-        propMap.put("git.commit.id.describe", "asterix-0.8.7-incubating-19-ge1dad19-dirty");
-        propMap.put("git.commit.id", "e1dad1984640517366a7e73e323c9de27b0676f7");
-        propMap.put("git.tags", "");
-        propMap.put("git.build.time", "22.10.2015 @ 17:11:07 PDT");
-        propMap.put("git.commit.user.name", "Obelix");
-
-        // Calls VersionAPIServlet.formResponseObject.
-        servlet.handle(mockRequest, mockResponse);
-
-        // Constructs the actual response.
-
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode actualResponse = (ObjectNode) om.readTree(outputStream.toByteArray());
-        ObjectNode expectedResponse = om.createObjectNode();
-        for (Map.Entry<String, String> e : propMap.entrySet()) {
-            expectedResponse.put(e.getKey(), e.getValue());
-        }
-
-        // Checks the response contains all the expected keys.
-        Assert.assertEquals(actualResponse.toString(), expectedResponse.toString());
-
-        // Tears down the asterixdb cluster.
-        SqlppExecutionTest.tearDown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
new file mode 100644
index 0000000..7fed010
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.api.http.servlet;
+
+import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.api.http.server.VersionApiServlet;
+import org.apache.asterix.common.config.BuildProperties;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.test.runtime.SqlppExecutionTest;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+
+public class VersionApiServletTest {
+
+    @Test
+    public void testGet() throws Exception {
+        // Starts test asterixdb cluster.
+        SqlppExecutionTest.setUp();
+
+        // Configures a test version api servlet.
+        VersionApiServlet servlet = new VersionApiServlet(new ConcurrentHashMap<>(), new String[] { "/" });
+        Map<String, String> propMap = new HashMap<>();
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        PrintWriter outputWriter = new PrintWriter(outputStream);
+
+        // Creates mocks.
+        AppContextInfo mockCtx = mock(AppContextInfo.class);
+        IServletRequest mockRequest = mock(IServletRequest.class);
+        IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
+        IServletResponse mockResponse = mock(IServletResponse.class);
+        BuildProperties mockProperties = mock(BuildProperties.class);
+        FullHttpRequest mockHttpRequest = mock(FullHttpRequest.class);
+
+        // Put stuff in let map
+        servlet.ctx().put(HYRACKS_CONNECTION_ATTR, mockHcc);
+        servlet.ctx().put(ASTERIX_BUILD_PROP_ATTR, mockCtx);
+        // Sets up mock returns.
+        when(mockResponse.writer()).thenReturn(outputWriter);
+        when(mockRequest.getHttpRequest()).thenReturn(mockHttpRequest);
+        when(mockHttpRequest.method()).thenReturn(HttpMethod.GET);
+        when(mockCtx.getBuildProperties()).thenReturn(mockProperties);
+        when(mockProperties.getAllProps()).thenReturn(propMap);
+
+        propMap.put("git.build.user.email", "foo@bar.baz");
+        propMap.put("git.build.host", "fulliautomatix");
+        propMap.put("git.dirty", "true");
+        propMap.put("git.remote.origin.url", "git@github.com:apache/incubator-asterixdb.git");
+        propMap.put("git.closest.tag.name", "asterix-0.8.7-incubating");
+        propMap.put("git.commit.id.describe-short", "asterix-0.8.7-incubating-19-dirty");
+        propMap.put("git.commit.user.email", "foo@bar.baz");
+        propMap.put("git.commit.time", "21.10.2015 @ 23:36:41 PDT");
+        propMap.put("git.commit.message.full",
+                "ASTERIXDB-1045: fix log file reading during recovery\n\nChange-Id: Ic83ee1dd2d7ba88180c25f4ec6c7aa8d0a5a7162\nReviewed-on: https://asterix-gerrit.ics.uci.edu/465\nTested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>");
+        propMap.put("git.build.version", "0.8.8-SNAPSHOT");
+        propMap.put("git.commit.message.short", "ASTERIXDB-1045: fix log file reading during recovery");
+        propMap.put("git.commit.id.abbrev", "e1dad19");
+        propMap.put("git.branch", "foo/bar");
+        propMap.put("git.build.user.name", "Asterix");
+        propMap.put("git.closest.tag.commit.count", "19");
+        propMap.put("git.commit.id.describe", "asterix-0.8.7-incubating-19-ge1dad19-dirty");
+        propMap.put("git.commit.id", "e1dad1984640517366a7e73e323c9de27b0676f7");
+        propMap.put("git.tags", "");
+        propMap.put("git.build.time", "22.10.2015 @ 17:11:07 PDT");
+        propMap.put("git.commit.user.name", "Obelix");
+
+        // Calls VersionAPIServlet.formResponseObject.
+        servlet.handle(mockRequest, mockResponse);
+
+        // Constructs the actual response.
+
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode actualResponse = (ObjectNode) om.readTree(outputStream.toByteArray());
+        ObjectNode expectedResponse = om.createObjectNode();
+        for (Map.Entry<String, String> e : propMap.entrySet()) {
+            expectedResponse.put(e.getKey(), e.getValue());
+        }
+
+        // Checks the response contains all the expected keys.
+        Assert.assertEquals(actualResponse.toString(), expectedResponse.toString());
+
+        // Tears down the asterixdb cluster.
+        SqlppExecutionTest.tearDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index edd1848..1467dbf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -51,6 +51,7 @@ import org.apache.hyracks.api.test.CountAnswer;
 import org.apache.hyracks.api.test.FrameWriterTestUtils;
 import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
 import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
@@ -58,6 +59,7 @@ import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -68,8 +70,8 @@ public class LogMarkerTest {
     private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
     private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
             new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
-    private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC,
-            GenerationFunction.DETERMINISTIC };
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
     private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
     private static final ARecordType META_TYPE = null;
     private static final GenerationFunction[] META_GEN_FUNCTION = null;
@@ -146,7 +148,9 @@ public class LogMarkerTest {
                         KEY_INDICATORS_LIST);
                 dataflowHelper.open();
                 LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance();
-                long lsn = btree.getMostRecentMarkerLSN();
+                LongPointable longPointable = LongPointable.FACTORY.createPointable();
+                ComponentMetadataUtil.get(btree, ComponentMetadataUtil.MARKER_LSN_KEY, longPointable);
+                long lsn = longPointable.getLong();
                 int numOfMarkers = 0;
                 LogReader logReader = (LogReader) nc.getTransactionSubsystem().getLogManager().getLogReader(false);
                 long expectedMarkerId = markerId - 1;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index 70339f3..e16ce78 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -30,12 +30,12 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractDiskLSMComponent;
 
 public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
 
@@ -60,18 +60,18 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
         // all such components for which the sum of their sizes exceeds MaxMrgCompSz.  Schedule a merge of those components into a new component.
         // 2.  If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt.  If so, schedule
         // a merge all of the current candidates into a new single component.
-        List<ILSMComponent> immutableComponents = new ArrayList<ILSMComponent>(index.getImmutableComponents());
+        List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
         // Reverse the components order so that we look at components from oldest to newest.
         Collections.reverse(immutableComponents);
 
-        for (ILSMComponent c : immutableComponents) {
+        for (ILSMDiskComponent c : immutableComponents) {
             if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
                 return;
             }
         }
         if (fullMergeIsRequested) {
-            ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            ILSMIndexAccessor accessor =
+                    index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             accessor.scheduleFullMerge(index.getIOOperationCallback());
             return;
         }
@@ -89,7 +89,7 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
 
         for (int i = 0; i < minNumComponents; i++) {
             ILSMComponent c = immutableComponents.get(i);
-            long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize();
+            long componentSize = ((ILSMDiskComponent) c).getComponentSize();
             if (componentSize > maxMergableComponentSize) {
                 startIndex = i;
                 totalSize = 0;
@@ -101,20 +101,20 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
                     || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) {
 
                 for (ILSMIndex lsmIndex : indexes) {
-                    List<ILSMComponent> reversedImmutableComponents = new ArrayList<ILSMComponent>(
-                            lsmIndex.getImmutableComponents());
+                    List<ILSMDiskComponent> reversedImmutableComponents =
+                            new ArrayList<>(lsmIndex.getImmutableComponents());
                     // Reverse the components order so that we look at components from oldest to newest.
                     Collections.reverse(reversedImmutableComponents);
 
-                    List<ILSMComponent> mergableComponents = new ArrayList<ILSMComponent>();
+                    List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
                     for (int j = startIndex + 1; j <= i; j++) {
                         mergableComponents.add(reversedImmutableComponents.get(j));
                     }
                     // Reverse the components order back to its original order
                     Collections.reverse(mergableComponents);
 
-                    ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
-                            NoOpOperationCallback.INSTANCE);
+                    ILSMIndexAccessor accessor =
+                            lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
                     accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
                 }
                 break;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 6350d73..7fca039 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -35,7 +35,6 @@ import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -101,8 +100,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
 
         if (!flushOnExit) {
             for (ILSMIndex lsmIndex : indexes) {
-                ILSMIndexInternal lsmIndexInternal = (ILSMIndexInternal) lsmIndex;
-                if (lsmIndexInternal.hasFlushRequestForCurrentMutableComponent()) {
+                if (lsmIndex.hasFlushRequestForCurrentMutableComponent()) {
                     needsFlush = true;
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
index cd40179..04090bb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
@@ -21,7 +21,8 @@ package org.apache.asterix.common.dataflow;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 
 public class LSMIndexUtil {
@@ -33,18 +34,18 @@ public class LSMIndexUtil {
             //prevent transactions from incorrectly setting the first LSN on a modified component by checking the index is still empty
             synchronized (lsmIndex.getOperationTracker()) {
                 if (lsmIndex.isCurrentMutableComponentEmpty()) {
-                    AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
-                            .getIOOperationCallback();
+                    AbstractLSMIOOperationCallback ioOpCallback =
+                            (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
                     ioOpCallback.setFirstLSN(logManager.getAppendLSN());
                 }
             }
         }
     }
 
-    public static long getComponentFileLSNOffset(AbstractLSMIndex lsmIndex, ILSMComponent lsmComponent,
+    public static long getComponentFileLSNOffset(ILSMIndex lsmIndex, ILSMDiskComponent lsmComponent,
             String componentFilePath) throws HyracksDataException {
-        AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
-                .getIOOperationCallback();
+        AbstractLSMIOOperationCallback ioOpCallback =
+                (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
         return ioOpCallback.getComponentFileLSNOffset(lsmComponent, componentFilePath);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java
index fded0d9..0c099fb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/RuntimeDataException.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class RuntimeDataException extends HyracksDataException {
+    private static final long serialVersionUID = 1L;
 
     public RuntimeDataException(int errorCode, Serializable... params) {
         super(ErrorCode.ASTERIX, errorCode, ErrorCode.getErrorMessage(errorCode), params);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 29af7c9..f903b65 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -27,13 +27,13 @@ import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 
 // A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
 public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
-    public static final MutableArrayValueReference LSN_KEY = new MutableArrayValueReference("LSN"
-            .getBytes());
+    public static final MutableArrayValueReference LSN_KEY = new MutableArrayValueReference("LSN".getBytes());
     public static final long INVALID = -1L;
 
     // First LSN per mutable component
@@ -78,7 +78,7 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
     }
 
     @Override
-    public void afterFinalize(LSMOperationType opType, ILSMComponent newComponent) {
+    public void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) {
         // The operation was complete and the next I/O operation for the LSM index didn't start yet
         if (opType == LSMOperationType.FLUSH && newComponent != null) {
             synchronized (this) {
@@ -93,15 +93,11 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
         }
     }
 
-    public abstract long getComponentLSN(List<ILSMComponent> oldComponents) throws HyracksDataException;
+    public abstract long getComponentLSN(List<? extends ILSMComponent> oldComponents) throws HyracksDataException;
 
-    public void putLSNIntoMetadata(ITreeIndex treeIndex, List<ILSMComponent> oldComponents)
+    public void putLSNIntoMetadata(ILSMDiskComponent index, List<ILSMComponent> oldComponents)
             throws HyracksDataException {
-        byte[] lsn = new byte[Long.BYTES];
-        LongPointable.setLong(lsn, 0, getComponentLSN(oldComponents));
-        IMetadataPageManager metadataPageManager = (IMetadataPageManager) treeIndex.getPageManager();
-        metadataPageManager.put(metadataPageManager.createMetadataFrame(), LSN_KEY, new MutableArrayValueReference(
-                lsn));
+        index.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents)));
     }
 
     public static long getTreeIndexLSN(ITreeIndex treeIndex) throws HyracksDataException {
@@ -144,6 +140,6 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
      *         otherwise {@link IMetadataPageManager#INVALID_LSN_OFFSET}.
      * @throws HyracksDataException
      */
-    public abstract long getComponentFileLSNOffset(ILSMComponent component, String componentFilePath)
+    public abstract long getComponentFileLSNOffset(ILSMDiskComponent component, String componentFilePath)
             throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 142b84f..173c962 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -22,12 +22,15 @@ package org.apache.asterix.common.ioopcallbacks;
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeDiskComponent;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
 
 public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
@@ -36,16 +39,24 @@ public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback
     }
 
     @Override
-    public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
-            throws HyracksDataException {
+    public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents,
+            ILSMDiskComponent newComponent) throws HyracksDataException {
+        //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
         if (newComponent != null) {
             LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) newComponent;
-            putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents);
+            putLSNIntoMetadata(btreeComponent, oldComponents);
+            if (opType == LSMOperationType.MERGE) {
+                LongPointable markerLsn = LongPointable.FACTORY
+                        .createPointable(ComponentMetadataUtil.getLong(oldComponents.get(0).getMetadata(),
+                                ComponentMetadataUtil.MARKER_LSN_KEY, ComponentMetadataUtil.NOT_FOUND));
+                btreeComponent.getMetadata().put(ComponentMetadataUtil.MARKER_LSN_KEY, markerLsn);
+            }
+
         }
     }
 
     @Override
-    public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
+    public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
             // Implies a flush IO operation. --> moves the flush pointer
             // Flush operation of an LSM index are executed sequentially.
@@ -64,12 +75,12 @@ public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback
     }
 
     @Override
-    public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath)
+    public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
             throws HyracksDataException {
         if (diskComponentFilePath.endsWith(LSMBTreeFileManager.BTREE_STRING)) {
             LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) diskComponent;
-            IMetadataPageManager metadataPageManager = (IMetadataPageManager) btreeComponent.getBTree()
-                    .getPageManager();
+            IMetadataPageManager metadataPageManager =
+                    (IMetadataPageManager) btreeComponent.getBTree().getPageManager();
             return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
         }
         return INVALID;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 21882dd..322b5ef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -32,7 +32,7 @@ public class LSMBTreeIOOperationCallbackFactory implements ILSMIOOperationCallba
     }
 
     @Override
-    public ILSMIOOperationCallback createIOOperationCallback() {
+    public ILSMIOOperationCallback createIoOpCallback() {
         return new LSMBTreeIOOperationCallback();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index a8c545d..6c987d6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -25,21 +25,22 @@ import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyDiskComponent;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 
 public class LSMBTreeWithBuddyIOOperationCallback extends AbstractLSMIOOperationCallback {
 
     @Override
-    public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
-            throws HyracksDataException {
+    public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents,
+            ILSMDiskComponent newComponent) throws HyracksDataException {
         if (newComponent != null) {
             LSMBTreeWithBuddyDiskComponent btreeComponent = (LSMBTreeWithBuddyDiskComponent) newComponent;
-            putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents);
+            putLSNIntoMetadata(btreeComponent, oldComponents);
         }
     }
 
     @Override
-    public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
+    public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
             // Implies a flush IO operation <Will never happen currently as Btree with buddy btree is only used with external datasets>
             synchronized (this) {
@@ -57,12 +58,12 @@ public class LSMBTreeWithBuddyIOOperationCallback extends AbstractLSMIOOperation
     }
 
     @Override
-    public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath)
+    public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
             throws HyracksDataException {
         if (diskComponentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_STRING)) {
             LSMBTreeWithBuddyDiskComponent btreeComponent = (LSMBTreeWithBuddyDiskComponent) diskComponent;
-            IMetadataPageManager metadataPageManager = (IMetadataPageManager) btreeComponent.getBTree()
-                    .getPageManager();
+            IMetadataPageManager metadataPageManager =
+                    (IMetadataPageManager) btreeComponent.getBTree().getPageManager();
             return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
         }
         return INVALID;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
index 848de29..1055fa6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
@@ -32,7 +32,7 @@ public class LSMBTreeWithBuddyIOOperationCallbackFactory implements ILSMIOOperat
     }
 
     @Override
-    public ILSMIOOperationCallback createIOOperationCallback() {
+    public ILSMIOOperationCallback createIoOpCallback() {
         return new LSMBTreeWithBuddyIOOperationCallback();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index ec09d65..657d908 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
 import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager;
@@ -35,16 +36,16 @@ public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationC
     }
 
     @Override
-    public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
-            throws HyracksDataException {
+    public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents,
+            ILSMDiskComponent newComponent) throws HyracksDataException {
         if (newComponent != null) {
             LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) newComponent;
-            putLSNIntoMetadata(invIndexComponent.getDeletedKeysBTree(), oldComponents);
+            putLSNIntoMetadata(invIndexComponent, oldComponents);
         }
     }
 
     @Override
-    public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
+    public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
             // Implies a flush IO operation.
             synchronized (this) {
@@ -56,18 +57,19 @@ public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationC
         long maxLSN = -1;
         for (Object o : diskComponents) {
             LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) o;
-            maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(invIndexComponent.getDeletedKeysBTree()), maxLSN);
+            maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(invIndexComponent.getDeletedKeysBTree()),
+                    maxLSN);
         }
         return maxLSN;
     }
 
     @Override
-    public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath)
+    public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
             throws HyracksDataException {
         if (diskComponentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX)) {
             LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) diskComponent;
-            IMetadataPageManager metadataPageManager = (IMetadataPageManager) invIndexComponent.getDeletedKeysBTree()
-                    .getPageManager();
+            IMetadataPageManager metadataPageManager =
+                    (IMetadataPageManager) invIndexComponent.getDeletedKeysBTree().getPageManager();
             return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
         }
         return INVALID;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index 8951cb4..0fa0167 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -33,7 +33,7 @@ public class LSMInvertedIndexIOOperationCallbackFactory implements ILSMIOOperati
     }
 
     @Override
-    public ILSMIOOperationCallback createIOOperationCallback() {
+    public ILSMIOOperationCallback createIoOpCallback() {
         return new LSMInvertedIndexIOOperationCallback();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 175250d..2dc06f7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
 import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
@@ -35,16 +36,16 @@ public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback
     }
 
     @Override
-    public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
-            throws HyracksDataException {
+    public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents,
+            ILSMDiskComponent newComponent) throws HyracksDataException {
         if (newComponent != null) {
             LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) newComponent;
-            putLSNIntoMetadata(rtreeComponent.getRTree(), oldComponents);
+            putLSNIntoMetadata(rtreeComponent, oldComponents);
         }
     }
 
     @Override
-    public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
+    public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
             // Implies a flush IO operation.
             synchronized (this) {
@@ -62,12 +63,12 @@ public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback
     }
 
     @Override
-    public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath)
+    public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
             throws HyracksDataException {
         if (diskComponentFilePath.endsWith(LSMRTreeFileManager.RTREE_STRING)) {
             LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) diskComponent;
-            IMetadataPageManager metadataPageManager = (IMetadataPageManager) rtreeComponent.getRTree()
-                    .getPageManager();
+            IMetadataPageManager metadataPageManager =
+                    (IMetadataPageManager) rtreeComponent.getRTree().getPageManager();
             return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
         }
         return INVALID;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 954c6e1..83db16a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -32,7 +32,7 @@ public class LSMRTreeIOOperationCallbackFactory implements ILSMIOOperationCallba
     }
 
     @Override
-    public ILSMIOOperationCallback createIOOperationCallback() {
+    public ILSMIOOperationCallback createIoOpCallback() {
         return new LSMRTreeIOOperationCallback();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d718dc4a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
index 7dae65f..b977c4d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
@@ -19,33 +19,104 @@
 package org.apache.asterix.common.transactions;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
 
 /**
  * A basic callback used to write marker to transaction logs
  */
 public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback {
-
-    private AbstractLSMIndex index;
+    private final LongPointable pointable = LongPointable.FACTORY.createPointable();
+    private final ILSMIndex index;
 
     /**
      * @param index:
      *            a pointer to the primary index used to store marker log info
      * @throws HyracksDataException
      */
-    public PrimaryIndexLogMarkerCallback(AbstractLSMIndex index) throws HyracksDataException {
+    public PrimaryIndexLogMarkerCallback(ILSMIndex index) throws HyracksDataException {
         this.index = index;
     }
 
     @Override
     public void before(ByteBuffer buffer) {
-        buffer.putLong(index.getCurrentMemoryComponent().getMostRecentMarkerLSN());
+        buffer.putLong(getLsn());
+    }
+
+    private long getLsn() {
+        long lsn;
+        try {
+            lsn = ComponentMetadataUtil.getLong(index.getCurrentMemoryComponent().getMetadata(),
+                    ComponentMetadataUtil.MARKER_LSN_KEY, ComponentMetadataUtil.NOT_FOUND);
+        } catch (HyracksDataException e) {
+            // Should never happen since this is a memory component
+            throw new IllegalStateException(e);
+        }
+        if (lsn == ComponentMetadataUtil.NOT_FOUND) {
+            synchronized (index.getOperationTracker()) {
+                // look for it in previous memory component if exists
+                lsn = lsnFromImmutableMemoryComponents();
+                if (lsn == ComponentMetadataUtil.NOT_FOUND) {
+                    // look for it in disk component
+                    lsn = lsnFromDiskComponents();
+                }
+            }
+        }
+        return lsn;
+    }
+
+    private long lsnFromDiskComponents() {
+        List<ILSMDiskComponent> diskComponents = index.getImmutableComponents();
+        for (ILSMDiskComponent c : diskComponents) {
+            try {
+                long lsn = ComponentMetadataUtil.getLong(c.getMetadata(), ComponentMetadataUtil.MARKER_LSN_KEY,
+                        ComponentMetadataUtil.NOT_FOUND);
+                if (lsn != ComponentMetadataUtil.NOT_FOUND) {
+                    return lsn;
+                }
+            } catch (HyracksDataException e) {
+                throw new IllegalStateException("Unable to read metadata page. Disk Error?", e);
+            }
+        }
+        return ComponentMetadataUtil.NOT_FOUND;
+    }
+
+    private long lsnFromImmutableMemoryComponents() {
+        List<ILSMMemoryComponent> memComponents = index.getMemoryComponents();
+        int numOtherMemComponents = memComponents.size() - 1;
+        int next = index.getCurrentMemoryComponentIndex();
+        long lsn = ComponentMetadataUtil.NOT_FOUND;
+        for (int i = 0; i < numOtherMemComponents; i++) {
+            next = next - 1;
+            if (next < 0) {
+                next = memComponents.size() - 1;
+            }
+            ILSMMemoryComponent c = index.getMemoryComponents().get(next);
+            if (c.isReadable()) {
+                try {
+                    lsn = ComponentMetadataUtil.getLong(c.getMetadata(), ComponentMetadataUtil.MARKER_LSN_KEY,
+                            ComponentMetadataUtil.NOT_FOUND);
+                } catch (HyracksDataException e) {
+                    // Should never happen since this is a memory component
+                    throw new IllegalStateException(e);
+                }
+                if (lsn != ComponentMetadataUtil.NOT_FOUND) {
+                    return lsn;
+                }
+            }
+        }
+        return lsn;
     }
 
     @Override
     public void after(long lsn) {
-        index.getCurrentMemoryComponent().setMostRecentMarkerLSN(lsn);
+        pointable.setLong(lsn);
+        index.getCurrentMemoryComponent().getMetadata().put(ComponentMetadataUtil.MARKER_LSN_KEY, pointable);
     }
 }


Mime
View raw message