asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject asterixdb git commit: Fix creation of callback factories
Date Fri, 24 Mar 2017 00:38:27 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 4a290fe19 -> 902db7851


Fix creation of callback factories

Change-Id: Idbeacf5af01b77c5f81b59aa6acec9b13762d629
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1613
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/902db785
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/902db785
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/902db785

Branch: refs/heads/master
Commit: 902db7851b9bd4a77b9cf3df18fe7049eecb2e1f
Parents: 4a290fe
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Thu Mar 23 16:49:08 2017 -0700
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Thu Mar 23 17:38:01 2017 -0700

----------------------------------------------------------------------
 .../ActiveSourceOperatorNodePushable.java       |  12 +-
 ...rtedIndexInsertDeleteOperatorDescriptor.java |   7 +-
 .../metadata/declared/MetadataProvider.java     | 139 ++++++-------------
 .../asterix/metadata/entities/Dataset.java      |  11 +-
 ...SMInvertedIndexUpsertOperatorDescriptor.java |  13 +-
 .../LSMTreeUpsertOperatorDescriptor.java        |   4 +-
 .../lsm/common/impls/SynchronousScheduler.java  |  17 ++-
 ...InvertedIndexInsertUpdateDeleteOperator.java |   5 +-
 8 files changed, 93 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/902db785/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 12f28c5..ac3caf3 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.active;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -28,6 +31,7 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNod
 public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable
         implements IActiveRuntime {
 
+    private final Logger LOGGER = Logger.getLogger(ActiveSourceOperatorNodePushable.class.getName());
     protected final IHyracksTaskContext ctx;
     protected final ActiveManager activeManager;
     /** A unique identifier for the runtime **/
@@ -79,6 +83,7 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutp
 
     @Override
     public final void initialize() throws HyracksDataException {
+        LOGGER.log(Level.INFO, "initialize() called on ActiveSourceOperatorNodePushable");
         activeManager.registerRuntime(this);
         try {
             // notify cc that runtime has been registered
@@ -86,15 +91,18 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutp
                     ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null);
             start();
         } catch (InterruptedException e) {
+            LOGGER.log(Level.INFO, "initialize() interrupted on ActiveSourceOperatorNodePushable",
e);
             Thread.currentThread().interrupt();
             throw new HyracksDataException(e);
         } catch (Exception e) {
+            LOGGER.log(Level.INFO, "initialize() failed on ActiveSourceOperatorNodePushable",
e);
             throw new HyracksDataException(e);
         } finally {
             synchronized (this) {
                 done = true;
                 notifyAll();
             }
+            LOGGER.log(Level.INFO, "initialize() returning on ActiveSourceOperatorNodePushable");
         }
     }
 
@@ -105,11 +113,13 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutp
             ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
                     ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED), null);
         } catch (Exception e) {
+            LOGGER.log(Level.INFO, "deinitialize() failed on ActiveSourceOperatorNodePushable",
e);
             throw new HyracksDataException(e);
+        } finally {
+            LOGGER.log(Level.INFO, "deinitialize() returning on ActiveSourceOperatorNodePushable");
         }
     }
 
-
     @Override
     public final IFrameWriter getInputFrameWriter(int index) {
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/902db785/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
index 2ff0617..4b28d42 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -49,13 +50,13 @@ public class LSMInvertedIndexInsertDeleteOperatorDescriptor extends LSMInvertedI
             IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
             IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory
tokenizerFactory,
             int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
-            ITupleFilterFactory tupleFilterFactory,
-            IModificationOperationCallbackFactory modificationOpCallbackFactory, String indexName,
+            ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory
modificationOpCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, String indexName,
             IPageManagerFactory pageManagerFactory) {
         super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider,
tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories,
tokenizerFactory,
                 fieldPermutation, op, dataflowHelperFactory, tupleFilterFactory, modificationOpCallbackFactory,
-                pageManagerFactory);
+                searchCallbackFactory, pageManagerFactory);
         this.indexName = indexName;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/902db785/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 39ea54d..f8f4a0e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -31,14 +31,12 @@ import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.IApplicationContextInfo;
 import org.apache.asterix.common.dataflow.LSMInvertedIndexInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
@@ -89,15 +87,6 @@ import org.apache.asterix.runtime.operators.LSMTreeUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
-import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
-import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
-import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -146,7 +135,6 @@ import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
@@ -157,7 +145,6 @@ import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescripto
 public class MetadataProvider implements IMetadataProvider<DataSourceId, String> {
 
     private final IStorageComponentProvider storaegComponentProvider;
-    private final ITransactionSubsystemProvider txnSubsystemProvider;
     private final IMetadataPageManagerFactory metadataPageManagerFactory;
     private final IPrimitiveValueProviderFactory primitiveValueProviderFactory;
     private final StorageProperties storageProperties;
@@ -182,7 +169,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
         this.storaegComponentProvider = componentProvider;
         storageProperties = AppContextInfo.INSTANCE.getStorageProperties();
         libraryManager = AppContextInfo.INSTANCE.getLibraryManager();
-        txnSubsystemProvider = componentProvider.getTransactionSubsystemProvider();
         metadataPageManagerFactory = componentProvider.getMetadataPageManagerFactory();
         primitiveValueProviderFactory = componentProvider.getPrimitiveValueProviderFactory();
     }
@@ -457,7 +443,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
         boolean isSecondary = true;
         int numSecondaryKeys = 0;
         try {
-            boolean temp = dataset.getDatasetDetails().isTemp();
             Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
                     dataset.getDatasetName(), dataset.getDatasetName());
             if (primaryIndex != null && (dataset.getDatasetType() != DatasetType.EXTERNAL))
{
@@ -521,27 +506,13 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
             spPc = getSplitProviderAndConstraints(dataset, theIndex.getIndexName());
-
-            ISearchOperationCallbackFactory searchCallbackFactory;
-            if (isSecondary) {
-                searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
-                        : new SecondaryIndexSearchOperationCallbackFactory();
-            } else {
-                int datasetId = dataset.getDatasetId();
-                int[] primaryKeyFields = new int[numPrimaryKeys];
-                for (int i = 0; i < numPrimaryKeys; i++) {
-                    primaryKeyFields[i] = i;
-                }
-
-                /**
-                 * Due to the read-committed isolation level,
-                 * we may acquire very short duration lock(i.e., instant lock) for readers.
-                 */
-                searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
-                        : new PrimaryIndexInstantSearchOperationCallbackFactory(
-                                ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(),
-                                datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+            int[] primaryKeyFields = new int[numPrimaryKeys];
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                primaryKeyFields[i] = i;
             }
+
+            ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, theIndex, jobId, IndexOperation.SEARCH, primaryKeyFields);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo
=
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             RuntimeComponentsProvider rtcProvider = RuntimeComponentsProvider.RUNTIME_PROVIDER;
@@ -577,8 +548,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
         try {
             ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(),
dataset.getItemTypeName());
             int numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
-
-            boolean temp = dataset.getDatasetDetails().isTemp();
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
                     dataset.getDatasetName(), indexName);
             if (secondaryIndex == null) {
@@ -646,8 +615,13 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
             }
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo
=
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            ISearchOperationCallbackFactory searchCallbackFactory =
-                    temp ? NoOpOperationCallbackFactory.INSTANCE : new SecondaryIndexSearchOperationCallbackFactory();
+            int[] primaryKeyFields = new int[numPrimaryKeys];
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                primaryKeyFields[i] = i;
+            }
+
+            ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, IndexOperation.SEARCH,
primaryKeyFields);
             RTreeSearchOperatorDescriptor rtreeSearchOp;
             IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this,
                     secondaryIndex, recType, metaType, compactionInfo.first, compactionInfo.second);
@@ -998,7 +972,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
                 throw new AlgebricksException(" Unabel to create merge policy factory for
external dataset", e);
             }
 
-            boolean temp = datasetDetails.isTemp();
             String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
             Index fileIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
                     dataset.getDatasetName(), fileIndexName);
@@ -1011,8 +984,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
             spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
                     dataset.getDatasetName(), fileIndexName, false);
-            ISearchOperationCallbackFactory searchOpCallbackFactory =
-                    temp ? NoOpOperationCallbackFactory.INSTANCE : new SecondaryIndexSearchOperationCallbackFactory();
+            ISearchOperationCallbackFactory searchOpCallbackFactory = dataset
+                    .getSearchCallbackFactory(storaegComponentProvider, fileIndex, jobId,
IndexOperation.SEARCH, null);
             // Create the operator
             ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec,
adapterFactory,
                     outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(),
@@ -1085,20 +1058,16 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
                     getSplitProviderAndConstraints(dataset);
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
             int[] primaryKeyFields = new int[numKeys];
             for (i = 0; i < numKeys; i++) {
                 primaryKeyFields[i] = i;
             }
 
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId,
datasetId,
-                            primaryKeyFields, txnSubsystemProvider, Operation.UPSERT, ResourceType.LSM_BTREE)
-                    : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
txnSubsystemProvider,
-                            Operation.UPSERT, ResourceType.LSM_BTREE);
+            IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT,
primaryKeyFields);
 
-            LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
-                    jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+            ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT,
primaryKeyFields);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo
=
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1281,18 +1250,14 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
                     getSplitProviderAndConstraints(dataset);
 
             // prepare callback
-            int datasetId = dataset.getDatasetId();
             int[] primaryKeyFields = new int[numKeys];
             for (i = 0; i < numKeys; i++) {
                 primaryKeyFields[i] = i;
             }
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(
-                            ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(),
datasetId,
-                            primaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
ResourceType.LSM_BTREE)
-                    : new PrimaryIndexModificationOperationCallbackFactory(
-                            ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(),
datasetId,
-                            primaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
ResourceType.LSM_BTREE);
+            IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, primaryIndex, jobId, indexOp, primaryKeyFields);
+            ISearchOperationCallbackFactory searchCallbackFactory = dataset
+                    .getSearchCallbackFactory(storaegComponentProvider, primaryIndex, jobId,
indexOp, primaryKeyFields);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo
=
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1310,7 +1275,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
typeTraits,
                         comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
idfh, null, true,
-                        indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+                        indexName, null, modificationCallbackFactory, searchCallbackFactory,
                         metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
@@ -1483,15 +1448,10 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId,
datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_BTREE)
-                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_BTREE);
-
+            IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields);
+            ISearchOperationCallbackFactory searchOpCallbackFactory = dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo
=
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory idfh = dataset.getIndexDataflowHelperFactory(this,
secondaryIndex, itemType,
@@ -1508,13 +1468,13 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
                 op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
typeTraits,
                         comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh,
filterFactory, false,
-                        indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
-                        prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
+                        indexName, null, modificationCallbackFactory, searchOpCallbackFactory,
prevFieldPermutation,
+                        metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
typeTraits,
                         comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
idfh, filterFactory,
-                        false, indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+                        false, indexName, null, modificationCallbackFactory, searchOpCallbackFactory,
                         metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
@@ -1641,15 +1601,10 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId,
datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_RTREE)
-                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_RTREE);
-
+            IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields);
+            ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo
=
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this,
@@ -1666,13 +1621,13 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
                 op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
typeTraits,
                         comparatorFactories, null, fieldPermutation, indexDataflowHelperFactory,
filterFactory, false,
-                        indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
-                        prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
+                        indexName, null, modificationCallbackFactory, searchCallbackFactory,
prevFieldPermutation,
+                        metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first,
typeTraits,
                         comparatorFactories, null, fieldPermutation, indexOp, indexDataflowHelperFactory,
filterFactory,
-                        false, indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+                        false, indexName, null, modificationCallbackFactory, searchCallbackFactory,
                         metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
@@ -1851,14 +1806,10 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId,
datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_INVERTED_INDEX)
-                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_INVERTED_INDEX);
+            IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields);
+            ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo
=
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory indexDataFlowFactory = dataset.getIndexDataflowHelperFactory(this,
@@ -1875,14 +1826,16 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId,
String>
                 op = new LSMInvertedIndexUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
                         splitsAndConstraint.first, appContext.getIndexLifecycleManagerProvider(),
tokenTypeTraits,
                         tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories,
tokenizerFactory,
-                        fieldPermutation, indexDataFlowFactory, filterFactory, modificationCallbackFactory,
indexName,
+                        fieldPermutation, indexDataFlowFactory, filterFactory, modificationCallbackFactory,
+                        searchCallbackFactory, indexName,
                         prevFieldPermutation, metadataPageManagerFactory);
             } else {
                 op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
                         appContext.getStorageManager(), splitsAndConstraint.first,
                         appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
                         invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
fieldPermutation, indexOp,
-                        indexDataFlowFactory, filterFactory, modificationCallbackFactory,
indexName,
+                        indexDataFlowFactory, filterFactory, modificationCallbackFactory,
searchCallbackFactory,
+                        indexName,
                         metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/902db785/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 82fe036..e0607a6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -68,6 +68,7 @@ import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperati
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
+import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
 import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
@@ -519,9 +520,13 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset
{
             int[] primaryKeyFields) throws AlgebricksException {
         if (getDatasetDetails().isTemp()) {
             return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT
-                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId,
getDatasetId(),
-                            primaryKeyFields, componentProvider.getTransactionSubsystemProvider(),
Operation.get(op),
-                            index.resourceType())
+                    ? index.isPrimaryIndex()
+                            ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId,
datasetId,
+                                    primaryKeyFields, componentProvider.getTransactionSubsystemProvider(),
+                                    Operation.get(op), index.resourceType())
+                            : new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId,
getDatasetId(),
+                                    primaryKeyFields, componentProvider.getTransactionSubsystemProvider(),
+                                    Operation.get(op), index.resourceType())
                     : NoOpOperationCallbackFactory.INSTANCE;
         } else if (index.isPrimaryIndex()) {
             return op == IndexOperation.UPSERT

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/902db785/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
index f1547a8..02c1908 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
@@ -31,14 +31,14 @@ import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
 
-public class LSMInvertedIndexUpsertOperatorDescriptor
-        extends LSMInvertedIndexInsertDeleteOperatorDescriptor {
+public class LSMInvertedIndexUpsertOperatorDescriptor extends LSMInvertedIndexInsertDeleteOperatorDescriptor
{
 
     private static final long serialVersionUID = 1L;
     private final int[] prevFieldPermutation;
@@ -50,18 +50,19 @@ public class LSMInvertedIndexUpsertOperatorDescriptor
             IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory
tokenizerFactory,
             int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory,
             ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory
modificationOpCallbackFactory,
-            String indexName, int[] prevFieldPermutation, IPageManagerFactory pageManagerFactory)
{
+            ISearchOperationCallbackFactory searchCallbackFactory, String indexName, int[]
prevFieldPermutation,
+            IPageManagerFactory pageManagerFactory) {
         super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider,
tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories,
tokenizerFactory,
                 fieldPermutation, IndexOperation.UPSERT, dataflowHelperFactory, tupleFilterFactory,
-                modificationOpCallbackFactory, indexName, pageManagerFactory);
+                modificationOpCallbackFactory, searchCallbackFactory, indexName, pageManagerFactory);
         this.prevFieldPermutation = prevFieldPermutation;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
-        return new LSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
-                recordDescProvider, prevFieldPermutation);
+        return new LSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
recordDescProvider,
+                prevFieldPermutation);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/902db785/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
index b37ecae..e6cfd2a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
@@ -54,13 +54,13 @@ public class LSMTreeUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperator
             IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[]
fieldPermutation,
             IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory,
             boolean isPrimary, String indexName, IMissingWriterFactory missingWriterFactory,
-            IModificationOperationCallbackFactory modificationOpCallbackProvider,
+            IModificationOperationCallbackFactory modificationOpCallbackFactory,
             ISearchOperationCallbackFactory searchOpCallbackProvider, int[] prevValuePermutation,
             IPageManagerFactory pageManagerFactory, IFrameOperationCallbackFactory frameOpCallbackFactory)
{
         super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider,
typeTraits,
                 comparatorFactories, bloomFilterKeyFields, fieldPermutation, IndexOperation.UPSERT,
                 dataflowHelperFactory, tupleFilterFactory, isPrimary, indexName, missingWriterFactory,
-                modificationOpCallbackProvider, searchOpCallbackProvider, pageManagerFactory);
+                modificationOpCallbackFactory, searchOpCallbackProvider, pageManagerFactory);
         this.prevValuePermutation = prevValuePermutation;
         this.frameOpCallbackFactory = frameOpCallbackFactory;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/902db785/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
index e9c0e5f..da3e986 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
@@ -18,20 +18,27 @@
  */
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 
-public enum SynchronousScheduler implements ILSMIOOperationScheduler {
-    INSTANCE;
+public class SynchronousScheduler implements ILSMIOOperationScheduler {
+    private static final Logger LOGGER = Logger.getLogger(SynchronousScheduler.class.getName());
+    public static final SynchronousScheduler INSTANCE = new SynchronousScheduler();
+
+    private SynchronousScheduler() {
+    }
 
     @Override
     public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException
{
         try {
             operation.call();
-        } catch (IndexException e) {
-            throw new HyracksDataException(e);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "IO Operation failed", e);
+            throw HyracksDataException.create(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/902db785/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
index 46201d5..a342370 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
@@ -31,9 +31,9 @@ import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
@@ -55,11 +55,12 @@ public class LSMInvertedIndexInsertUpdateDeleteOperator extends AbstractLSMInver
             int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
             ITupleFilterFactory tupleFilterFactory,
             IModificationOperationCallbackFactory modificationOpCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory,
             IPageManagerFactory pageManagerFactory) {
         super(spec, 1, 1, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider,
tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories,
tokenizerFactory,
                 dataflowHelperFactory, tupleFilterFactory, false, false,
-                null, NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+                null, NoOpLocalResourceFactoryProvider.INSTANCE, searchCallbackFactory,
                 modificationOpCallbackFactory,
                 pageManagerFactory);
         this.fieldPermutation = fieldPermutation;


Mime
View raw message