asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [24/25] asterixdb git commit: Separate index build from index access
Date Thu, 11 May 2017 23:43:39 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 555f571..6ad85b3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -85,6 +85,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
 import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory;
+import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
 import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
@@ -95,7 +96,6 @@ import org.apache.hyracks.storage.common.buffercache.IPageCleanerPolicy;
 import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
 import org.apache.hyracks.storage.common.file.IFileMapManager;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
-import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
@@ -336,7 +336,7 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     }
 
     @Override
-    public IIOManager getIOManager() {
+    public IIOManager getIoManager() {
         return ioManager;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 07f341d..691be50 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -44,6 +44,7 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
@@ -56,7 +57,6 @@ import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.transactions.Resource;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -67,12 +67,12 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import org.apache.hyracks.storage.common.file.LocalResource;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.LocalResource;
 
 /**
  * This is the Recovery Manager and is responsible for rolling back a
@@ -307,7 +307,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         long lsn = -1;
         ILSMIndex index = null;
         LocalResource localResource = null;
-        Resource localResourceMetadata = null;
+        DatasetLocalResource localResourceMetadata = null;
         boolean foundWinner = false;
         JobEntityCommits jobEntityWinners = null;
 
@@ -359,6 +359,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                                  * log record.
                                  *******************************************************************/
                                 if (localResource == null) {
+                                    LOGGER.log(Level.WARNING, "resource was not found for resource id " + resourceId);
                                     logRecord = logReader.next();
                                     continue;
                                 }
@@ -368,11 +369,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                                 //if index is not registered into IndexLifeCycleManager,
                                 //create the index using LocalMetadata stored in LocalResourceRepository
                                 //get partition path in this node
-                                localResourceMetadata = (Resource) localResource.getResource();
+                                localResourceMetadata = (DatasetLocalResource) localResource.getResource();
                                 index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath());
                                 if (index == null) {
                                     //#. create index instance and register to indexLifeCycleManager
-                                    index = localResourceMetadata.createIndexInstance(serviceCtx, localResource);
+                                    index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
                                     datasetLifecycleManager.register(localResource.getPath(), index);
                                     datasetLifecycleManager.open(localResource.getPath());
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index 0e6cf9a..9f86a26 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -81,7 +81,7 @@ public class TransactionSubsystem implements ITransactionSubsystem {
         if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) {
             throw new IllegalStateException(
                     String.format("Storage version mismatch. Current version (%s). On disk version: (%s)",
-                            latestCheckpoint.getStorageVersion(), StorageConstants.VERSION));
+                            StorageConstants.VERSION, latestCheckpoint.getStorageVersion()));
         }
 
         if (replicationEnabled) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index e0648ce..5047bc2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -524,7 +524,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy,
                 dataverseName + "." + datasetName, defaultCompactionPolicy);
         Dataset dataset = null;
-        Index primaryIndex = null;
         try {
             IDatasetDetails datasetDetails = null;
             Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
@@ -612,7 +611,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(),
                     MetadataUtil.PENDING_ADD_OP);
             MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
-            primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
             if (dd.getDatasetType() == DatasetType.INTERNAL) {
                 Dataverse dataverse =
                         MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName);
@@ -653,7 +651,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
-                    JobSpecification jobSpec = DatasetUtil.dropDatasetJobSpec(dataset, primaryIndex, metadataProvider);
+                    JobSpecification jobSpec = DatasetUtil.dropDatasetJobSpec(dataset, metadataProvider);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
                     JobUtils.runJob(hcc, jobSpec, true);
@@ -872,7 +870,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
                     || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
                     || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
-                List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(ds);
+                List<List<String>> partitioningKeys = ds.getPrimaryKeys();
                 for (List<String> partitioningKey : partitioningKeys) {
                     IAType keyType = aRecordType.getSubFieldType(partitioningKey);
                     ITypeTraits typeTrait = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
@@ -933,8 +931,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                         MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
                     }
                     // This is the first index for the external dataset, replicate the files index
-                    spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot,
-                            metadataProvider, true);
+                    spec = ExternalIndexingOperations.buildFilesIndexCreateJobSpec(ds, externalFilesSnapshot,
+                            metadataProvider);
                     if (spec == null) {
                         throw new CompilationException(
                                 "Failed to create job spec for replicating Files Index For external dataset");
@@ -1213,9 +1211,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                                     IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, datasets.get(j)));
                         }
                     }
-                    Index primaryIndex =
-                            MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
-                    jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(datasets.get(j), primaryIndex, metadataProvider));
+                    jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(datasets.get(j), metadataProvider));
                 } else {
                     // External dataset
                     List<Index> indexes =
@@ -2268,14 +2264,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             MetadataProvider metadataProvider, ARecordType enforcedType, ARecordType enforcedMeta)
             throws AlgebricksException {
         for (int j = 0; j < indexes.size(); j++) {
-            if (!ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
-                jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, indexes.get(j), aRecordType,
-                        metaRecordType, enforcedType, enforcedMeta, metadataProvider));
-            }
+            jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, indexes.get(j), aRecordType,
+                    metaRecordType, enforcedType, enforcedMeta, metadataProvider));
 
         }
-        jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider,
-                new StorageComponentProvider()));
     }
 
     private interface IMetadataLocker {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java
index d0d14c8..8326fe2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java
@@ -26,7 +26,6 @@ import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import org.apache.hyracks.storage.am.common.freepage.AppendOnlyLinkedMetadataPageManagerFactory;
@@ -66,11 +65,6 @@ public class StorageComponentProvider implements IStorageComponentProvider {
     }
 
     @Override
-    public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider() {
-        return RuntimeComponentsProvider.RUNTIME_PROVIDER;
-    }
-
-    @Override
     public IStorageManager getStorageManager() {
         return RuntimeComponentsProvider.RUNTIME_PROVIDER;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index ec01e1c..26952ad 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -205,7 +205,7 @@ public class NCApplication extends BaseNCApplication {
 
     private void performLocalCleanUp() {
         //Delete working area files from failed jobs
-        runtimeContext.getIOManager().deleteWorkspaceFiles();
+        runtimeContext.getIoManager().deleteWorkspaceFiles();
 
         //Reclaim storage for temporary datasets.
         String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index d1ff871..6155450 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -214,8 +214,8 @@ public class FeedOperations {
             JobSpecification subJob = jobsList.get(iter1);
             operatorIdMapping.clear();
             Map<OperatorDescriptorId, IOperatorDescriptor> operatorsMap = subJob.getOperatorMap();
-            FeedConnectionId feedConnectionId =
-                    new FeedConnectionId(ingestionOp.getEntityId(), feedConnections.get(iter1).getDatasetName());
+            String datasetName = feedConnections.get(iter1).getDatasetName();
+            FeedConnectionId feedConnectionId = new FeedConnectionId(ingestionOp.getEntityId(), datasetName);
 
             FeedPolicyEntity feedPolicyEntity =
                     FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(),
@@ -227,9 +227,8 @@ public class FeedOperations {
                 OperatorDescriptorId opId = null;
                 if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
                         && ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) {
-                    String operandId = ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
                     metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
-                            feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, operandId);
+                            feedPolicyEntity.getProperties(), FeedRuntimeType.STORE);
                     opId = metaOp.getOperatorId();
                     opDesc.setOperatorId(opId);
                 } else {
@@ -243,7 +242,7 @@ public class FeedOperations {
                             // anything on the network interface needs to be message compatible
                             if (connectorDesc instanceof MToNPartitioningConnectorDescriptor) {
                                 metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
-                                        feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE, null);
+                                        feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE);
                                 opId = metaOp.getOperatorId();
                                 opDesc.setOperatorId(opId);
                             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index cdb9b5b..856638f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -21,6 +21,7 @@ package org.apache.asterix.app.bootstrap;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.logging.Logger;
@@ -33,10 +34,9 @@ import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.context.TransactionSubsystemProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
-import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
-import org.apache.asterix.common.transactions.IResourceFactory;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -48,16 +48,14 @@ import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.MetadataUtil;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
 import org.apache.asterix.transaction.management.runtime.CommitRuntime;
 import org.apache.asterix.transaction.management.service.logging.LogReader;
 import org.apache.commons.lang3.tuple.Pair;
@@ -68,6 +66,8 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -75,7 +75,6 @@ import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.util.HyracksConstants;
@@ -83,17 +82,17 @@ import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
-import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
-import org.apache.hyracks.storage.common.file.LocalResource;
+import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.hyracks.test.support.TestUtils;
 import org.apache.hyracks.util.file.FileUtil;
 import org.mockito.Mockito;
@@ -175,14 +174,13 @@ public class TestNodeController {
                 new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(),
                         primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
                         ResourceType.LSM_BTREE);
-        LSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
-                getInsertOpratorDesc(primaryIndexInfo, modOpCallbackFactory);
-        IIndexDataflowHelperFactory dataflowHelperFactory =
-                getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo, storageComponentProvider, dataset);
-        Mockito.when(indexOpDesc.getIndexDataflowHelperFactory()).thenReturn(dataflowHelperFactory);
         IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
-        LSMInsertDeleteOperatorNodePushable insertOp = new LSMInsertDeleteOperatorNodePushable(indexOpDesc, ctx,
-                PARTITION, primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider, op, true);
+        IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
+                storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider);
+        LSMInsertDeleteOperatorNodePushable insertOp = new LSMInsertDeleteOperatorNodePushable(ctx, PARTITION,
+                primaryIndexInfo.primaryIndexInsertFieldsPermutations,
+                recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), op,
+                true, indexHelperFactory, modOpCallbackFactory, null);
         CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(),
                 primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, true);
         insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
@@ -200,18 +198,13 @@ public class TestNodeController {
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
                 storageComponentProvider);
-        IIndexDataflowHelperFactory indexDataflowHelperFactory =
-                getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo, storageComponentProvider, dataset);
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(
+                storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider);
         BTreeSearchOperatorDescriptor searchOpDesc = new BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                primaryIndexInfo.fileSplitProvider, primaryIndexInfo.primaryIndexTypeTraits,
-                primaryIndexInfo.primaryIndexComparatorFactories, primaryIndexInfo.primaryIndexBloomFilterKeyFields,
-                primaryIndexInfo.primaryKeyIndexes, primaryIndexInfo.primaryKeyIndexes, true, true,
-                indexDataflowHelperFactory, false, false, null, NoOpOperationCallbackFactory.INSTANCE, filterFields,
-                filterFields, storageComponentProvider.getMetadataPageManagerFactory());
-        BTreeSearchOperatorNodePushable searchOp = new BTreeSearchOperatorNodePushable(searchOpDesc, ctx, 0,
-                primaryIndexInfo.getSearchRecordDescriptorProvider(), /*primaryIndexInfo.primaryKeyIndexes*/null,
-                /*primaryIndexInfo.primaryKeyIndexes*/null, true, true, false, filterFields, filterFields);
+                null, null, true, true, indexDataflowHelperFactory, false, false, null,
+                NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false);
+        BTreeSearchOperatorNodePushable searchOp =
+                searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(), PARTITION, 1);
         emptyTupleOp.setFrameWriter(0, searchOp,
                 primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0));
         searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc);
@@ -227,76 +220,8 @@ public class TestNodeController {
         return jobId;
     }
 
-    public LSMTreeInsertDeleteOperatorDescriptor getInsertOpratorDesc(PrimaryIndexInfo primaryIndexInfo,
-            IModificationOperationCallbackFactory modOpCallbackFactory) {
-        LSMTreeInsertDeleteOperatorDescriptor indexOpDesc = Mockito.mock(LSMTreeInsertDeleteOperatorDescriptor.class);
-        Mockito.when(indexOpDesc.getLifecycleManagerProvider()).thenReturn(RuntimeComponentsProvider.RUNTIME_PROVIDER);
-        Mockito.when(indexOpDesc.getStorageManager()).thenReturn(RuntimeComponentsProvider.RUNTIME_PROVIDER);
-        Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider);
-        Mockito.when(indexOpDesc.getLocalResourceFactoryProvider())
-                .thenReturn(primaryIndexInfo.localResourceFactoryProvider);
-        Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits);
-        Mockito.when(indexOpDesc.getTreeIndexComparatorFactories())
-                .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories);
-        Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields())
-                .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields);
-        Mockito.when(indexOpDesc.getModificationOpCallbackFactory()).thenReturn(modOpCallbackFactory);
-        Mockito.when(indexOpDesc.getPageManagerFactory())
-                .thenReturn(primaryIndexInfo.storageComponentProvider.getMetadataPageManagerFactory());
-        return indexOpDesc;
-    }
-
-    public TreeIndexCreateOperatorDescriptor getIndexCreateOpDesc(PrimaryIndexInfo primaryIndexInfo) {
-        TreeIndexCreateOperatorDescriptor indexOpDesc = Mockito.mock(TreeIndexCreateOperatorDescriptor.class);
-        Mockito.when(indexOpDesc.getLifecycleManagerProvider()).thenReturn(RuntimeComponentsProvider.RUNTIME_PROVIDER);
-        Mockito.when(indexOpDesc.getStorageManager()).thenReturn(RuntimeComponentsProvider.RUNTIME_PROVIDER);
-        Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider);
-        Mockito.when(indexOpDesc.getLocalResourceFactoryProvider())
-                .thenReturn(primaryIndexInfo.localResourceFactoryProvider);
-        Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits);
-        Mockito.when(indexOpDesc.getTreeIndexComparatorFactories())
-                .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories);
-        Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields())
-                .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields);
-        Mockito.when(indexOpDesc.getPageManagerFactory())
-                .thenReturn(primaryIndexInfo.storageComponentProvider.getMetadataPageManagerFactory());
-        return indexOpDesc;
-    }
-
-    public ConstantFileSplitProvider getFileSplitProvider(Dataset dataset) {
-        FileSplit fileSplit = new ManagedFileSplit(ExecutionTestUtil.integrationUtil.ncs[0].getId(),
-                dataset.getDataverseName() + File.separator + dataset.getDatasetName());
-        return new ConstantFileSplitProvider(new FileSplit[] { fileSplit });
-    }
-
-    public ILocalResourceFactoryProvider getPrimaryIndexLocalResourceMetadataProvider(
-            IStorageComponentProvider storageComponentProvider, Index index, Dataset dataset,
-            ITypeTraits[] primaryIndexTypeTraits, IBinaryComparatorFactory[] primaryIndexComparatorFactories,
-            int[] primaryIndexBloomFilterKeyFields, ILSMMergePolicyFactory mergePolicyFactory,
-            Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits,
-            IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields,
-            ILSMOperationTrackerFactory opTrackerProvider) throws AlgebricksException {
-        IResourceFactory localResourceMetadata = new LSMBTreeLocalResourceMetadataFactory(primaryIndexTypeTraits,
-                primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields, true, dataset.getDatasetId(),
-                mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields,
-                filterFields, opTrackerProvider, dataset.getIoOperationCallbackFactory(index),
-                storageComponentProvider.getMetadataPageManagerFactory());
-        ILocalResourceFactoryProvider localResourceFactoryProvider =
-                new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMBTreeResource);
-        return localResourceFactoryProvider;
-    }
-
-    public IIndexDataflowHelper getPrimaryIndexDataflowHelper(IHyracksTaskContext ctx,
-            PrimaryIndexInfo primaryIndexInfo, TreeIndexCreateOperatorDescriptor indexOpDesc,
-            IStorageComponentProvider storageComponentProvider, Dataset dataset)
-            throws AlgebricksException, HyracksDataException {
-        return getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo, storageComponentProvider, dataset)
-                .createIndexDataflowHelper(indexOpDesc, ctx, PARTITION);
-    }
-
-    public IIndexDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(IHyracksTaskContext ctx,
-            PrimaryIndexInfo primaryIndexInfo, IStorageComponentProvider storageComponentProvider, Dataset dataset)
-            throws AlgebricksException {
+    public IResourceFactory getPrimaryResourceFactory(IHyracksTaskContext ctx, PrimaryIndexInfo primaryIndexInfo,
+            IStorageComponentProvider storageComponentProvider, Dataset dataset) throws AlgebricksException {
         Dataverse dataverse = new Dataverse(dataset.getDataverseName(), NonTaggedDataFormat.class.getName(),
                 MetadataUtil.PENDING_NO_OP);
         Index index = primaryIndexInfo.getIndex();
@@ -304,27 +229,13 @@ public class TestNodeController {
                 (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
         MetadataProvider mdProvider = new MetadataProvider(appCtx, dataverse, storageComponentProvider);
         try {
-            return dataset.getIndexDataflowHelperFactory(mdProvider, index, primaryIndexInfo.recordType,
-                    primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory,
-                    primaryIndexInfo.mergePolicyProperties);
+            return dataset.getResourceFactory(mdProvider, index, primaryIndexInfo.recordType, primaryIndexInfo.metaType,
+                    primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties);
         } finally {
             mdProvider.getLocks().unlock();
         }
     }
 
-    public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
-            ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
-            Map<String, String> mergePolicyProperties, int[] filterFields,
-            IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes,
-            List<Integer> primaryKeyIndicators) throws AlgebricksException, HyracksDataException {
-        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
-                mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
-                storageComponentProvider);
-        TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
-        return getPrimaryIndexDataflowHelper(createTestContext(true), primaryIndexInfo, indexOpDesc,
-                storageComponentProvider, dataset);
-    }
-
     public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
             ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
             int[] filterFields, IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes,
@@ -332,10 +243,23 @@ public class TestNodeController {
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
                 storageComponentProvider);
-        TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
-        IIndexDataflowHelper dataflowHelper = getPrimaryIndexDataflowHelper(createTestContext(true), primaryIndexInfo,
-                indexOpDesc, storageComponentProvider, dataset);
-        dataflowHelper.create();
+        Dataverse dataverse = new Dataverse(dataset.getDataverseName(), NonTaggedDataFormat.class.getName(),
+                MetadataUtil.PENDING_NO_OP);
+        MetadataProvider mdProvider = new MetadataProvider(
+                (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse,
+                storageComponentProvider);
+        try {
+            IResourceFactory resourceFactory = dataset.getResourceFactory(mdProvider, primaryIndexInfo.index,
+                    recordType, metaType, mergePolicyFactory, mergePolicyProperties);
+            IndexBuilderFactory indexBuilderFactory =
+                    new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
+                            primaryIndexInfo.fileSplitProvider, resourceFactory, !dataset.isTemp());
+            IHyracksTaskContext ctx = createTestContext(false);
+            IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, 0);
+            indexBuilder.build();
+        } finally {
+            mdProvider.getLocks().unlock();
+        }
     }
 
     private int[] createPrimaryIndexBloomFilterFields(int length) {
@@ -392,7 +316,7 @@ public class TestNodeController {
         }
         ctx = Mockito.spy(ctx);
         Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
-        Mockito.when(ctx.getIOManager()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getIoManager());
+        Mockito.when(ctx.getIoManager()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getIoManager());
         return ctx;
     }
 
@@ -430,7 +354,6 @@ public class TestNodeController {
         private ITypeTraits[] filterTypeTraits;
         private IBinaryComparatorFactory[] filterCmpFactories;
         private int[] btreeFields;
-        private ILocalResourceFactoryProvider localResourceFactoryProvider;
         private ConstantFileSplitProvider fileSplitProvider;
         private RecordDescriptor rDesc;
         private int[] primaryIndexInsertFieldsPermutations;
@@ -463,7 +386,6 @@ public class TestNodeController {
             filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(dataset, recordType,
                     NonTaggedDataFormat.INSTANCE.getBinaryComparatorFactoryProvider());
             btreeFields = DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
-            fileSplitProvider = getFileSplitProvider(dataset);
             primaryIndexSerdes =
                     createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
             rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
@@ -483,10 +405,9 @@ public class TestNodeController {
             index = new Index(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(),
                     IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, keyFieldTypes, false, true,
                     MetadataUtil.PENDING_NO_OP);
-            localResourceFactoryProvider = getPrimaryIndexLocalResourceMetadataProvider(storageComponentProvider, index,
-                    dataset, primaryIndexTypeTraits, primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields,
-                    mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields,
-                    filterFields, dataset.getIndexOperationTrackerFactory(index));
+            List<String> nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId());
+            FileSplit[] splits = SplitsAndConstraintsUtil.getDatasetSplits(dataset, nodes, index.getIndexName(), false);
+            fileSplitProvider = new ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1));
         }
 
         public Index getIndex() {
@@ -523,4 +444,21 @@ public class TestNodeController {
                 createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType);
         return new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
     }
+
+    public IndexDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(PrimaryIndexInfo primaryIndexInfo,
+            IStorageComponentProvider storageComponentProvider) throws AlgebricksException {
+        return new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider);
+    }
+
+    public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
+            ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, int[] filterFields,
+            IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes,
+            List<Integer> primaryKeyIndicators) throws AlgebricksException, HyracksDataException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
+                storageComponentProvider);
+        return getPrimaryIndexDataflowHelperFactory(primaryIndexInfo, storageComponentProvider)
+                .create(createTestContext(true), PARTITION);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 1bcc88a..ef72c67 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.test.dataflow;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -105,9 +106,11 @@ public class LogMarkerTest {
             TestNodeController nc = new TestNodeController(null, false);
             nc.init();
             StorageComponentProvider storageManager = new StorageComponentProvider();
+            List<List<String>> partitioningKeys = new ArrayList<>();
+            partitioningKeys.add(Collections.singletonList("key"));
             Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
                     NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                            Collections.emptyList(), null, null, null, false, null, false),
+                            partitioningKeys, null, null, null, false, null, false),
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
                 nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 35c3c42..d2bf3d3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.test.logging;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -112,9 +113,11 @@ public class CheckpointingTest {
             TestNodeController nc = new TestNodeController(new File(TEST_CONFIG_FILE_PATH).getAbsolutePath(), false);
             StorageComponentProvider storageManager = new StorageComponentProvider();
             nc.init();
+            List<List<String>> partitioningKeys = new ArrayList<>();
+            partitioningKeys.add(Collections.singletonList("key"));
             Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
                     NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                            Collections.emptyList(), null, null, null, false, null, false),
+                            partitioningKeys, null, null, null, false, null, false),
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
                 nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index ae19f23..a32d4dc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -25,9 +25,9 @@ import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 
 public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IIndex> {
     /**
@@ -73,19 +73,19 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd
     /**
      * creates (if necessary) and returns the primary index operation tracker of a dataset.
      *
-     * @param datasetID
+     * @param datasetId
      * @return
      */
-    PrimaryIndexOperationTracker getOperationTracker(int datasetID);
+    PrimaryIndexOperationTracker getOperationTracker(int datasetId);
 
     /**
      * creates (if necessary) and returns the dataset virtual buffer caches.
      *
-     * @param datasetID
+     * @param datasetId
      * @param ioDeviceNum
      * @return
      */
-    List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int ioDeviceNum);
+    List<IVirtualBufferCache> getVirtualBufferCaches(int datasetId, int ioDeviceNum);
 
     /**
      * Flushes then closes all open datasets

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 5e69746..a4b994b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -36,14 +36,14 @@ import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
-import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public interface INcApplicationContext extends IApplicationContext {
 
-    IIOManager getIOManager();
+    IIOManager getIoManager();
 
     Executor getThreadExecutor();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
index f122096..84e7ed5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
@@ -21,32 +21,33 @@ package org.apache.asterix.common.context;
 import java.util.List;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 
 public class AsterixVirtualBufferCacheProvider implements IVirtualBufferCacheProvider {
 
     private static final long serialVersionUID = 1L;
-    private final int datasetID;
+    private final int datasetId;
 
-    public AsterixVirtualBufferCacheProvider(int datasetID) {
-        this.datasetID = datasetID;
+    public AsterixVirtualBufferCacheProvider(int datasetId) {
+        this.datasetId = datasetId;
     }
 
     @Override
-    public List<IVirtualBufferCache> getVirtualBufferCaches(IHyracksTaskContext ctx,
-            IFileSplitProvider fileSplitProvider) throws HyracksDataException {
-        final int partition = ctx.getTaskAttemptId().getTaskId().getPartition();
-        IIOManager ioManager = ctx.getIOManager();
-        FileSplit fileSplit = fileSplitProvider.getFileSplits()[partition];
-        FileReference fileRef = fileSplit.getFileReference(ioManager);
+    public List<IVirtualBufferCache> getVirtualBufferCaches(INCServiceContext ctx, FileReference fileRef)
+            throws HyracksDataException {
+        IIOManager ioManager = ctx.getIoManager();
+        int deviceId = getDeviceId(ioManager, fileRef);
+        return ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager()
+                .getVirtualBufferCaches(datasetId, deviceId);
+    }
+
+    public static int getDeviceId(IIOManager ioManager, FileReference fileRef) {
         IODeviceHandle device = fileRef.getDeviceHandle();
         List<IODeviceHandle> devices = ioManager.getIODevices();
         int deviceId = 0;
@@ -56,8 +57,7 @@ public class AsterixVirtualBufferCacheProvider implements IVirtualBufferCachePro
                 deviceId = i;
             }
         }
-        return ((INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext())
-                .getDatasetLifecycleManager().getVirtualBufferCaches(datasetID, deviceId);
+        return deviceId;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 5c1d094..9cb1de5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -19,11 +19,11 @@
 package org.apache.asterix.common.context;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
-import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
 public class BaseOperationTracker implements ILSMOperationTracker {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index 17da26a..cd1d95e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -27,7 +27,6 @@ import java.util.Set;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
@@ -42,11 +41,11 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
     private int maxToleranceComponentCount;
 
     private final IDatasetLifecycleManager datasetLifecycleManager;
-    private final int datasetID;
+    private final int datasetId;
 
-    public CorrelatedPrefixMergePolicy(IResourceLifecycleManager datasetLifecycleManager, int datasetID) {
-        this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager;
-        this.datasetID = datasetID;
+    public CorrelatedPrefixMergePolicy(IDatasetLifecycleManager datasetLifecycleManager, int datasetId) {
+        this.datasetLifecycleManager = datasetLifecycleManager;
+        this.datasetId = datasetId;
     }
 
     @Override
@@ -80,7 +79,7 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
         int startIndex = -1;
         int minNumComponents = Integer.MAX_VALUE;
 
-        Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetInfo(datasetID).getDatasetIndexes();
+        Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetIndexes();
         for (ILSMIndex lsmIndex : indexes) {
             minNumComponents = Math.min(minNumComponents, lsmIndex.getImmutableComponents().size());
         }
@@ -122,8 +121,10 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
 
     @Override
     public void configure(Map<String, String> properties) {
-        maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
-        maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count"));
+        maxMergableComponentSize =
+                Long.parseLong(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE));
+        maxToleranceComponentCount =
+                Integer.parseInt(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
index cec9f57..3c141bc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
@@ -24,35 +24,26 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
 public class CorrelatedPrefixMergePolicyFactory implements ILSMMergePolicyFactory {
 
     private static final long serialVersionUID = 1L;
+    public static final String NAME = "correlated-prefix";
+    public static final String KEY_DATASET_ID = "datasetId";
+    public static final String KEY_MAX_COMPONENT_SIZE = "max-mergable-component-size";
+    public static final String KEY_MAX_COMPONENT_COUNT = "max-tolerance-component-count";
 
-    private static final String[] SET_VALUES = new String[] { "max-mergable-component-size",
-            "max-tolerance-component-count" };
+    private static final String[] SET_VALUES = new String[] { KEY_MAX_COMPONENT_SIZE, KEY_MAX_COMPONENT_COUNT };
     private static final Set<String> PROPERTIES_NAMES = new HashSet<>(Arrays.asList(SET_VALUES));
 
-    private int datasetID;
-
-    @Override
-    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IHyracksTaskContext ctx) {
-        IDatasetLifecycleManager dslcManager = ((INcApplicationContext) ctx.getJobletContext()
-                .getServiceContext().getApplicationContext()).getDatasetLifecycleManager();
-        ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(dslcManager, datasetID);
-        policy.configure(properties);
-        return policy;
-    }
-
     @Override
     public String getName() {
-        return "correlated-prefix";
+        return NAME;
     }
 
     @Override
@@ -61,13 +52,12 @@ public class CorrelatedPrefixMergePolicyFactory implements ILSMMergePolicyFactor
     }
 
     @Override
-    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IResourceLifecycleManager ilcm) {
-        ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(ilcm, datasetID);
-        policy.configure(properties);
+    public ILSMMergePolicy createMergePolicy(Map<String, String> configuration, INCServiceContext ctx) {
+        IDatasetLifecycleManager dslcManager =
+                ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
+        int datasetId = Integer.parseInt(configuration.get(KEY_DATASET_ID));
+        ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(dslcManager, datasetId);
+        policy.configure(configuration);
         return policy;
     }
-
-    public void setDatasetID(int datasetID) {
-        this.datasetID = datasetID;
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 1a8ccae..9529366 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -28,24 +28,24 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.StorageProperties;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
-import org.apache.asterix.common.transactions.Resource;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.LocalResource;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.LocalResource;
 
 public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
     private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>();
@@ -59,9 +59,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     private final int numPartitions;
     private volatile boolean stopped = false;
 
-    public DatasetLifecycleManager(StorageProperties storageProperties,
-            ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager,
-            int numPartitions) {
+    public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository,
+            int firstAvilableUserDatasetID, ILogManager logManager, int numPartitions) {
         this.logManager = logManager;
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
@@ -107,7 +106,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         if (lr == null) {
             return -1;
         }
-        return ((Resource) lr.getResource()).datasetId();
+        return ((DatasetLocalResource) lr.getResource()).getDatasetId();
     }
 
     public long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -209,8 +208,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         for (DatasetResource dsr : datasetsResources) {
             PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
             if (opTracker != null && opTracker.getNumActiveOperations() == 0
-                    && dsr.getDatasetInfo().getReferenceCount() == 0
-                    && dsr.getDatasetInfo().isOpen()
+                    && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen()
                     && dsr.getDatasetInfo().getDatasetID() >= getFirstAvilableUserDatasetID()) {
                 closeDataset(dsr.getDatasetInfo());
                 return true;
@@ -221,8 +219,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
 
     private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
         if (iInfo.isOpen()) {
-            ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            ILSMIndexAccessor accessor =
+                    iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
         }
 
@@ -250,8 +248,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
                 DatasetInfo dsInfo = new DatasetInfo(did);
                 PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(did, logManager, dsInfo);
                 DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
-                        getFirstAvilableUserDatasetID(),
-                        getNumPartitions());
+                        getFirstAvilableUserDatasetID(), getNumPartitions());
                 dsr = new DatasetResource(dsInfo, opTracker, vbcs);
                 datasets.put(did, dsr);
             }
@@ -320,8 +317,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public PrimaryIndexOperationTracker getOperationTracker(int datasetID) {
-        return datasets.get(datasetID).getOpTracker();
+    public PrimaryIndexOperationTracker getOperationTracker(int datasetId) {
+        return datasets.get(datasetId).getOpTracker();
     }
 
     private void validateDatasetLifecycleManagerState() throws HyracksDataException {
@@ -357,8 +354,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
             PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
             synchronized (opTracker) {
                 for (IndexInfo iInfo : dsr.getIndexes().values()) {
-                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
-                            .getIOOperationCallback();
+                    AbstractLSMIOOperationCallback ioCallback =
+                            (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
                     if (!(((AbstractLSMIndex) iInfo.getIndex()).isCurrentMutableComponentEmpty()
                             || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated()
                             || opTracker.isFlushOnExit())) {
@@ -400,16 +397,16 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
             }
             for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
                 //update resource lsn
-                AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
-                        .getIOOperationCallback();
+                AbstractLSMIOOperationCallback ioOpCallback =
+                        (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
                 ioOpCallback.updateLastLSN(logRecord.getLSN());
             }
         }
 
         if (asyncFlush) {
             for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
-                ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE,
-                        NoOpOperationCallback.INSTANCE);
+                ILSMIndexAccessor accessor =
+                        iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
                 accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
             }
         } else {
@@ -501,9 +498,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         sb.append(String.format(dsHeaderFormat, "DatasetID", "Open", "Reference Count", "Last Access"));
         for (DatasetResource dsr : datasets.values()) {
             DatasetInfo dsInfo = dsr.getDatasetInfo();
-            sb.append(
-                    String.format(dsFormat, dsInfo.getDatasetID(), dsInfo.isOpen(), dsInfo.getReferenceCount(),
-                            dsInfo.getLastAccess()));
+            sb.append(String.format(dsFormat, dsInfo.getDatasetID(), dsInfo.isOpen(), dsInfo.getReferenceCount(),
+                    dsInfo.getLastAccess()));
         }
         sb.append("\n");
 
@@ -514,8 +510,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
             for (Map.Entry<Long, IndexInfo> entry : dsInfo.getIndexes().entrySet()) {
                 IndexInfo iInfo = entry.getValue();
                 sb.append(String.format(idxFormat, dsInfo.getDatasetID(), entry.getKey(), iInfo.isOpen(),
-                        iInfo.getReferenceCount(),
-                        iInfo.getIndex()));
+                        iInfo.getReferenceCount(), iInfo.getIndex()));
             }
         }
         outputStream.write(sb.toString().getBytes());
@@ -557,7 +552,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
                 while (used + additionalSize > capacity) {
                     if (!evictCandidateDataset()) {
                         throw new HyracksDataException("Cannot allocate dataset " + dsInfo.getDatasetID()
-                        + " memory since memory budget would be exceeded.");
+                                + " memory since memory budget would be exceeded.");
                     }
                 }
                 used += additionalSize;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 41e587d..a880ce2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -21,8 +21,8 @@ package org.apache.asterix.common.context;
 import java.util.Map;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.IIndex;
 
 /**
  * A dataset can be in one of two states { EVICTED , LOADED }.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IStorageComponentProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IStorageComponentProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IStorageComponentProvider.java
index d454349..f528f12 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IStorageComponentProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IStorageComponentProvider.java
@@ -20,7 +20,6 @@ package org.apache.asterix.common.context;
 
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
@@ -42,12 +41,6 @@ public interface IStorageComponentProvider {
     ILSMIOOperationSchedulerProvider getIoOperationSchedulerProvider();
 
     /**
-     * @return the application's root
-     *         {@link org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider} instance
-     */
-    IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider();
-
-    /**
      * @return {@link org.apache.hyracks.storage.common.IStorageManager} instance
      */
     IStorageManager getStorageManager();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index feeb578..903bb50 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -29,8 +29,6 @@ import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
-import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -38,6 +36,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
 public class PrimaryIndexOperationTracker extends BaseOperationTracker {
 
@@ -143,20 +143,16 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
     //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled.
     public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
         for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
-
             //get resource
             ILSMIndexAccessor accessor =
                     lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-
             //update resource lsn
             AbstractLSMIOOperationCallback ioOpCallback =
                     (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
             ioOpCallback.updateLastLSN(logRecord.getLSN());
-
             //schedule flush after update
             accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
         }
-
         flushLogCreated = false;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
new file mode 100644
index 0000000..78b5cb2
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.dataflow;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IResource;
+
+/**
+ * A local resource with a dataset id and an assigned partition
+ */
+public class DatasetLocalResource implements IResource {
+
+    private static final long serialVersionUID = 1L;
+    /**
+     * The dataset id
+     */
+    private final int datasetId;
+    /**
+     * The resource partition
+     */
+    private final int partition;
+    private final IResource resource;
+
+    public DatasetLocalResource(int datasetId, int partition, IResource resource) {
+        this.datasetId = datasetId;
+        this.partition = partition;
+        this.resource = resource;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public int getDatasetId() {
+        return datasetId;
+    }
+
+    @Override
+    public String getPath() {
+        return resource.getPath();
+    }
+
+    @Override
+    public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException {
+        return resource.createInstance(ncServiceCtx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index e5b78cf..b7adfde 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -25,72 +25,63 @@ import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.common.IStorageManager;
 
 /**
  * Provides methods for obtaining
- * {@link org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider},
  * {@link org.apache.hyracks.storage.common.IStorageManager},
  * {@link org.apache.hyracks.api.application.ICCServiceContext},
  * {@link org.apache.asterix.common.cluster.IGlobalRecoveryManager},
- * and {@link org.apache.asterix.common.library.ILibraryManager}
+ * {@link org.apache.asterix.common.library.ILibraryManager},
+ * {@link org.apache.asterix.common.transactions.IResourceIdManager}
+ *
  * at the cluster controller side.
  */
 public interface ICcApplicationContext extends IApplicationContext {
 
     /**
-     * Returns an instance of the implementation for IIndexLifecycleManagerProvider.
-     *
-     * @return IIndexLifecycleManagerProvider implementation instance
-     */
-    public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider();
-
-    /**
      * @return an instance which implements {@link org.apache.hyracks.storage.common.IStorageManager}
      */
-    public IStorageManager getStorageManager();
+    IStorageManager getStorageManager();
 
     /**
      * @return an instance which implements {@link org.apache.hyracks.api.application.ICCServiceContext}
      */
     @Override
-    public ICCServiceContext getServiceContext();
+    ICCServiceContext getServiceContext();
 
     /**
      * @return the global recovery manager which implements
      *         {@link org.apache.asterix.common.cluster.IGlobalRecoveryManager}
      */
-    public IGlobalRecoveryManager getGlobalRecoveryManager();
+    IGlobalRecoveryManager getGlobalRecoveryManager();
 
     /**
      * @return the active lifecycle listener at Cluster controller
      */
-    public IJobLifecycleListener getActiveLifecycleListener();
+    IJobLifecycleListener getActiveLifecycleListener();
 
     /**
      * @return a new instance of {@link IHyracksClientConnection}
      */
-    public IHyracksClientConnection getHcc();
+    IHyracksClientConnection getHcc();
 
     /**
-     * Returns the resource manager
-     *
-     * @return {@link IResourceIdManager} implementation instance
+     * @return the cluster wide resource id manager
      */
-    public IResourceIdManager getResourceIdManager();
+    IResourceIdManager getResourceIdManager();
 
     /**
      * Returns the storage component provider
      *
      * @return {@link IStorageComponentProvider} implementation instance
      */
-    public IStorageComponentProvider getStorageComponentProvider();
+    IStorageComponentProvider getStorageComponentProvider();
 
     /**
      * Returns the extension manager
      *
      * @return the extension manager instance
      */
-    public Object getExtensionManager();
+    Object getExtensionManager();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 4b70dd7..3f9fba9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -25,7 +25,6 @@ import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -34,8 +33,9 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
@@ -64,16 +64,17 @@ public class LSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDel
     private int currentTupleIdx;
     private int lastFlushedTupleIdx;
 
-    public LSMInsertDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op,
-            boolean isPrimary) throws HyracksDataException {
-        super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, op);
+    public LSMInsertDeleteOperatorNodePushable(IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
+            RecordDescriptor inputRecDesc, IndexOperation op, boolean isPrimary,
+            IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory,
+            ITupleFilterFactory tupleFilterFactory) throws HyracksDataException {
+        super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, op, modCallbackFactory,
+                tupleFilterFactory);
         this.isPrimary = isPrimary;
     }
 
     @Override
     public void open() throws HyracksDataException {
-        RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
         appender = new FrameTupleAppender(writeBuffer);
@@ -85,10 +86,9 @@ public class LSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDel
                 TaskUtil.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
             }
             writer.open();
-            modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
-                    indexHelper.getResource(), ctx, this);
+            modCallback =
+                    modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
             indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
-            ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
             if (tupleFilterFactory != null) {
                 tupleFilter = tupleFilterFactory.createTupleFilter(ctx);
                 frameTuple = new FrameTupleReference();


Mime
View raw message