Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A4390200CA9 for ; Fri, 12 May 2017 01:43:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A2E8C160BCE; Thu, 11 May 2017 23:43:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 80777160BDB for ; Fri, 12 May 2017 01:43:20 +0200 (CEST) Received: (qmail 33552 invoked by uid 500); 11 May 2017 23:43:19 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 33027 invoked by uid 99); 11 May 2017 23:43:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 May 2017 23:43:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 616C5E943C; Thu, 11 May 2017 23:43:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.apache.org Date: Thu, 11 May 2017 23:43:36 -0000 Message-Id: In-Reply-To: <5f594fe565684258b006e5f3b566db33@git.apache.org> References: <5f594fe565684258b006e5f3b566db33@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/25] asterixdb git commit: Separate index build from index access archived-at: Thu, 11 May 2017 23:43:22 -0000 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java new file mode 100644 index 0000000..bfc6a8e --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.declared; + +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.indexing.FilesIndexDescription; +import org.apache.asterix.external.indexing.IndexingConstants; +import org.apache.asterix.metadata.api.IResourceFactoryProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.utils.IndexUtil; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.algebricks.data.ITypeTraitProvider; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; +import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeLocalResourceFactory; +import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyLocalResourceFactory; +import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResourceFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; +import org.apache.hyracks.storage.common.IResourceFactory; +import org.apache.hyracks.storage.common.IStorageManager; + +public class BTreeResourceFactoryProvider implements IResourceFactoryProvider { + + public static final BTreeResourceFactoryProvider INSTANCE = new BTreeResourceFactoryProvider(); + + private BTreeResourceFactoryProvider() { + } + + @Override + public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Dataset dataset, Index index, + ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, + Map mergePolicyProperties, ITypeTraits[] filterTypeTraits, + IBinaryComparatorFactory[] filterCmpFactories) throws AlgebricksException { + int[] filterFields = IndexUtil.getFilterFields(dataset, index, filterTypeTraits); + int[] btreeFields = IndexUtil.getBtreeFieldsIfFiltered(dataset, index); + IStorageComponentProvider storageComponentProvider = mdProvider.getStorageComponentProvider(); + ITypeTraits[] typeTraits = getTypeTraits(mdProvider, dataset, index, recordType, metaType); + IBinaryComparatorFactory[] cmpFactories = getCmpFactories(mdProvider, dataset, index, recordType, metaType); + int[] bloomFilterFields = getBloomFilterFields(dataset, index); + boolean durable = !dataset.isTemp(); + double bloomFilterFalsePositiveRate = mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate(); + ILSMOperationTrackerFactory opTrackerFactory = dataset.getIndexOperationTrackerFactory(index); + ILSMIOOperationCallbackFactory ioOpCallbackFactory = dataset.getIoOperationCallbackFactory(index); + IStorageManager storageManager = storageComponentProvider.getStorageManager(); + IMetadataPageManagerFactory metadataPageManagerFactory = + storageComponentProvider.getMetadataPageManagerFactory(); + ILSMIOOperationSchedulerProvider ioSchedulerProvider = + storageComponentProvider.getIoOperationSchedulerProvider(); + switch (dataset.getDatasetType()) { + case EXTERNAL: + return index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName())) + ? new ExternalBTreeLocalResourceFactory(storageManager, typeTraits, cmpFactories, + filterTypeTraits, filterCmpFactories, filterFields, opTrackerFactory, + ioOpCallbackFactory, metadataPageManagerFactory, ioSchedulerProvider, + mergePolicyFactory, mergePolicyProperties, durable, bloomFilterFields, + bloomFilterFalsePositiveRate, false, btreeFields) + : new ExternalBTreeWithBuddyLocalResourceFactory(storageManager, typeTraits, cmpFactories, + filterTypeTraits, filterCmpFactories, filterFields, opTrackerFactory, + ioOpCallbackFactory, metadataPageManagerFactory, ioSchedulerProvider, + mergePolicyFactory, mergePolicyProperties, durable, bloomFilterFields, + bloomFilterFalsePositiveRate, false, btreeFields); + case INTERNAL: + AsterixVirtualBufferCacheProvider vbcProvider = + new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()); + return new LSMBTreeLocalResourceFactory(storageManager, typeTraits, cmpFactories, filterTypeTraits, + filterCmpFactories, filterFields, opTrackerFactory, ioOpCallbackFactory, + metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, mergePolicyFactory, + mergePolicyProperties, durable, bloomFilterFields, bloomFilterFalsePositiveRate, + index.isPrimaryIndex(), btreeFields); + default: + throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE, + dataset.getDatasetType().toString()); + } + } + + private static ITypeTraits[] getTypeTraits(MetadataProvider metadataProvider, Dataset dataset, Index index, + ARecordType recordType, ARecordType metaType) throws AlgebricksException { + ITypeTraits[] primaryTypeTraits = dataset.getPrimaryTypeTraits(metadataProvider, recordType, metaType); + if (index.isPrimaryIndex()) { + return primaryTypeTraits; + } else if (dataset.getDatasetType() == DatasetType.EXTERNAL + && index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) { + return FilesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS; + } + int numPrimaryKeys = dataset.getPrimaryKeys().size(); + int numSecondaryKeys = index.getKeyFieldNames().size(); + ITypeTraitProvider typeTraitProvider = metadataProvider.getStorageComponentProvider().getTypeTraitProvider(); + ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys]; + for (int i = 0; i < numSecondaryKeys; i++) { + ARecordType sourceType; + List keySourceIndicators = index.getKeyFieldSourceIndicators(); + if (keySourceIndicators == null || keySourceIndicators.get(i) == 0) { + sourceType = recordType; + } else { + sourceType = metaType; + } + Pair keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(i), + index.getKeyFieldNames().get(i), sourceType); + IAType keyType = keyTypePair.first; + secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType); + } + // Add serializers and comparators for primary index fields. + for (int i = 0; i < numPrimaryKeys; i++) { + secondaryTypeTraits[numSecondaryKeys + i] = primaryTypeTraits[i]; + } + return secondaryTypeTraits; + } + + private static IBinaryComparatorFactory[] getCmpFactories(MetadataProvider metadataProvider, Dataset dataset, + Index index, ARecordType recordType, ARecordType metaType) throws AlgebricksException { + IBinaryComparatorFactory[] primaryCmpFactories = + dataset.getPrimaryComparatorFactories(metadataProvider, recordType, metaType); + if (index.isPrimaryIndex()) { + return dataset.getPrimaryComparatorFactories(metadataProvider, recordType, metaType); + } else if (dataset.getDatasetType() == DatasetType.EXTERNAL + && index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) { + return FilesIndexDescription.FILES_INDEX_COMP_FACTORIES; + } + int numPrimaryKeys = dataset.getPrimaryKeys().size(); + int numSecondaryKeys = index.getKeyFieldNames().size(); + IBinaryComparatorFactoryProvider cmpFactoryProvider = + metadataProvider.getStorageComponentProvider().getComparatorFactoryProvider(); + IBinaryComparatorFactory[] secondaryCmpFactories = + new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys]; + for (int i = 0; i < numSecondaryKeys; i++) { + ARecordType sourceType; + List keySourceIndicators = index.getKeyFieldSourceIndicators(); + if (keySourceIndicators == null || keySourceIndicators.get(i) == 0) { + sourceType = recordType; + } else { + sourceType = metaType; + } + Pair keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(i), + index.getKeyFieldNames().get(i), sourceType); + IAType keyType = keyTypePair.first; + secondaryCmpFactories[i] = cmpFactoryProvider.getBinaryComparatorFactory(keyType, true); + } + // Add serializers and comparators for primary index fields. + for (int i = 0; i < numPrimaryKeys; i++) { + secondaryCmpFactories[numSecondaryKeys + i] = primaryCmpFactories[i]; + } + return secondaryCmpFactories; + } + + private static int[] getBloomFilterFields(Dataset dataset, Index index) throws AlgebricksException { + if (index.isPrimaryIndex()) { + return dataset.getPrimaryBloomFilterFields(); + } else if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + if (index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) { + return FilesIndexDescription.BLOOM_FILTER_FIELDS; + } else { + return new int[] { index.getKeyFieldNames().size() }; + } + } + int numKeys = index.getKeyFieldNames().size(); + int[] bloomFilterKeyFields = new int[numKeys]; + for (int i = 0; i < numKeys; i++) { + bloomFilterKeyFields[i] = i; + } + return bloomFilterKeyFields; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java index 9b2d4c4..b13f4c2 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java @@ -27,7 +27,6 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.InternalDatasetDetails; -import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.IAType; @@ -60,7 +59,7 @@ public class LoadableDataSource extends DataSource { this.targetDataset = targetDataset; this.adapter = adapter; this.adapterProperties = properties; - partitioningKeys = DatasetUtil.getPartitioningKeys(targetDataset); + partitioningKeys = targetDataset.getPrimaryKeys(); ARecordType recType = (ARecordType) itemType; isPKAutoGenerated = ((InternalDatasetDetails) targetDataset.getDatasetDetails()).isAutogenerated(); if (isPKAutoGenerated) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/735532e4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 774b73e..5934f5e 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -32,7 +32,6 @@ import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.dataflow.LSMInvertedIndexInsertDeleteOperatorDescriptor; import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; @@ -69,8 +68,8 @@ import org.apache.asterix.metadata.entities.Feed; import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.metadata.feeds.FeedMetadataUtil; +import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry; import org.apache.asterix.metadata.lock.LockList; import org.apache.asterix.metadata.lock.MetadataLockManager; import org.apache.asterix.metadata.utils.DatasetUtil; @@ -84,10 +83,9 @@ import org.apache.asterix.om.utils.NonTaggedFormatUtil; import org.apache.asterix.runtime.base.AsterixTupleFilterFactory; import org.apache.asterix.runtime.formats.FormatUtils; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; -import org.apache.asterix.runtime.operators.LSMInvertedIndexUpsertOperatorDescriptor; -import org.apache.asterix.runtime.operators.LSMTreeUpsertOperatorDescriptor; +import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor; +import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor; import org.apache.asterix.runtime.utils.ClusterStateManager; -import org.apache.asterix.runtime.utils.RuntimeComponentsProvider; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -124,7 +122,6 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.primitive.ShortPointable; import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; @@ -135,13 +132,13 @@ import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFa import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor; import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor; +import org.apache.hyracks.storage.common.IStorageManager; public class MetadataProvider implements IMetadataProvider { @@ -450,7 +447,6 @@ public class MetadataProvider implements IMetadataProvider boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException { boolean isSecondary = true; - int numSecondaryKeys = 0; try { Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName()); @@ -459,62 +455,10 @@ public class MetadataProvider implements IMetadataProvider } Index theIndex = isSecondary ? MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), indexName) : primaryIndex; - int numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size(); + int numPrimaryKeys = dataset.getPrimaryKeys().size(); RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); - int[] bloomFilterKeyFields; - ITypeTraits[] typeTraits; - IBinaryComparatorFactory[] comparatorFactories; - - ARecordType itemType = - (ARecordType) this.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); - ARecordType metaType = null; - List primaryKeyIndicators = null; - if (dataset.hasMetaPart()) { - metaType = - (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()); - primaryKeyIndicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator(); - } - - ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, itemType); - int[] filterFields; - int[] btreeFields; - if (isSecondary) { - numSecondaryKeys = theIndex.getKeyFieldNames().size(); - bloomFilterKeyFields = new int[numSecondaryKeys]; - for (int i = 0; i < numSecondaryKeys; i++) { - bloomFilterKeyFields[i] = i; - } - Pair comparatorFactoriesAndTypeTraits = - getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(theIndex.getKeyFieldNames(), - theIndex.getKeyFieldTypes(), DatasetUtil.getPartitioningKeys(dataset), itemType, - dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators, - theIndex.getKeyFieldSourceIndicators(), metaType); - comparatorFactories = comparatorFactoriesAndTypeTraits.first; - typeTraits = comparatorFactoriesAndTypeTraits.second; - if (filterTypeTraits != null) { - filterFields = new int[1]; - filterFields[0] = numSecondaryKeys + numPrimaryKeys; - btreeFields = new int[numSecondaryKeys + numPrimaryKeys]; - for (int k = 0; k < btreeFields.length; k++) { - btreeFields[k] = k; - } - } - - } else { - bloomFilterKeyFields = new int[numPrimaryKeys]; - for (int i = 0; i < numPrimaryKeys; i++) { - bloomFilterKeyFields[i] = i; - } - // get meta item type - ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset); - typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType); - comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType, - context.getBinaryComparatorFactoryProvider()); - } - - ICcApplicationContext appContext = (ICcApplicationContext) context.getAppContext(); - Pair spPc; - spPc = getSplitProviderAndConstraints(dataset, theIndex.getIndexName()); + Pair spPc = + getSplitProviderAndConstraints(dataset, theIndex.getIndexName()); int[] primaryKeyFields = new int[numPrimaryKeys]; for (int i = 0; i < numPrimaryKeys; i++) { primaryKeyFields[i] = i; @@ -522,27 +466,20 @@ public class MetadataProvider implements IMetadataProvider ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( storaegComponentProvider, theIndex, jobId, IndexOperation.SEARCH, primaryKeyFields); - Pair> compactionInfo = - DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); - RuntimeComponentsProvider rtcProvider = RuntimeComponentsProvider.RUNTIME_PROVIDER; + IStorageManager storageManager = getStorageComponentProvider().getStorageManager(); + IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(storageManager, spPc.first); BTreeSearchOperatorDescriptor btreeSearchOp; if (dataset.getDatasetType() == DatasetType.INTERNAL) { - btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, - appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), spPc.first, - typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields, - lowKeyInclusive, highKeyInclusive, - dataset.getIndexDataflowHelperFactory(this, theIndex, itemType, metaType, compactionInfo.first, - compactionInfo.second), - retainInput, retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, - minFilterFieldIndexes, maxFilterFieldIndexes, metadataPageManagerFactory); + btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields, + lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing, + context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes, + maxFilterFieldIndexes, false); } else { - IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this, - theIndex, itemType, metaType, compactionInfo.first, compactionInfo.second); - btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider, - rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, - highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput, - retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, - metadataPageManagerFactory); + btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, + highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, + retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes, + maxFilterFieldIndexes, + ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this)); } return new Pair<>(btreeSearchOp, spPc.second); } catch (MetadataException me) { @@ -555,75 +492,16 @@ public class MetadataProvider implements IMetadataProvider JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName, int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException { try { - ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); - int numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size(); + int numPrimaryKeys = dataset.getPrimaryKeys().size(); Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), indexName); if (secondaryIndex == null) { throw new AlgebricksException( "Code generation error: no index " + indexName + " for dataset " + dataset.getDatasetName()); } - List> secondaryKeyFields = secondaryIndex.getKeyFieldNames(); - List secondaryKeyTypes = secondaryIndex.getKeyFieldTypes(); - int numSecondaryKeys = secondaryKeyFields.size(); - if (numSecondaryKeys != 1) { - throw new AlgebricksException( - "Cannot use " + numSecondaryKeys + " fields as a key for the R-tree index. " - + "There can be only one field as a key for the R-tree index."); - } - Pair keyTypePair = - Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyFields.get(0), recType); - IAType keyType = keyTypePair.first; - if (keyType == null) { - throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema."); - } - int numDimensions = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag()); - int numNestedSecondaryKeyFields = numDimensions * 2; - IPrimitiveValueProviderFactory[] valueProviderFactories = - new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields]; - for (int i = 0; i < numNestedSecondaryKeyFields; i++) { - valueProviderFactories[i] = primitiveValueProviderFactory; - } - RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); - // IS NOT THE VARIABLE BELOW ALWAYS = 0 ?? - int keysStartIndex = outputRecDesc.getFieldCount() - numNestedSecondaryKeyFields - numPrimaryKeys; - if (retainInput) { - keysStartIndex -= numNestedSecondaryKeyFields; - } - IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories( - outputVars, keysStartIndex, numNestedSecondaryKeyFields, typeEnv, context); - ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, - numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context); - ICcApplicationContext appContext = (ICcApplicationContext) context.getAppContext(); Pair spPc = getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); - ARecordType metaType = null; - if (dataset.hasMetaPart()) { - metaType = - (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()); - } - - IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories( - dataset, recType, metaType, context.getBinaryComparatorFactoryProvider()); - int[] btreeFields = new int[primaryComparatorFactories.length]; - for (int i = 0; i < btreeFields.length; i++) { - btreeFields[i] = i + numNestedSecondaryKeyFields; - } - - ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, recType); - int[] filterFields; - int[] rtreeFields; - if (filterTypeTraits != null) { - filterFields = new int[1]; - filterFields[0] = numNestedSecondaryKeyFields + numPrimaryKeys; - rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys]; - for (int i = 0; i < rtreeFields.length; i++) { - rtreeFields[i] = i; - } - } - Pair> compactionInfo = - DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); int[] primaryKeyFields = new int[numPrimaryKeys]; for (int i = 0; i < numPrimaryKeys; i++) { primaryKeyFields[i] = i; @@ -632,21 +510,18 @@ public class MetadataProvider implements IMetadataProvider ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( storaegComponentProvider, secondaryIndex, jobId, IndexOperation.SEARCH, primaryKeyFields); RTreeSearchOperatorDescriptor rtreeSearchOp; - IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this, - secondaryIndex, recType, metaType, compactionInfo.first, compactionInfo.second); + IIndexDataflowHelperFactory indexDataflowHelperFactory = + new IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), spPc.first); if (dataset.getDatasetType() == DatasetType.INTERNAL) { - rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, - appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), spPc.first, - typeTraits, comparatorFactories, keyFields, indexDataflowHelperFactory, retainInput, - retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes, - maxFilterFieldIndexes, metadataPageManagerFactory); + rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, true, + indexDataflowHelperFactory, retainInput, retainMissing, context.getMissingWriterFactory(), + searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes, false); } else { // Create the operator - rtreeSearchOp = new ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, - appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), spPc.first, - typeTraits, comparatorFactories, keyFields, indexDataflowHelperFactory, retainInput, - retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, - metadataPageManagerFactory); + rtreeSearchOp = new ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, true, + indexDataflowHelperFactory, retainInput, retainMissing, context.getMissingWriterFactory(), + searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes, + ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this)); } return new Pair<>(rtreeSearchOp, spPc.second); @@ -696,19 +571,16 @@ public class MetadataProvider implements IMetadataProvider JobSpecification spec) throws AlgebricksException { String dataverseName = dataSource.getId().getDataverseName(); String datasetName = dataSource.getId().getDatasourceName(); - Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); int numKeys = keys.size(); int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; // move key fields to front int[] fieldPermutation = new int[numKeys + 1 + numFilterFields]; - int[] bloomFilterKeyFields = new int[numKeys]; int i = 0; for (LogicalVariable varKey : keys) { int idx = propagatedSchema.findVariable(varKey); fieldPermutation[i] = idx; - bloomFilterKeyFields[i] = i; i++; } fieldPermutation[numKeys] = propagatedSchema.findVariable(payload); @@ -720,38 +592,19 @@ public class MetadataProvider implements IMetadataProvider try { boolean temp = dataset.getDatasetDetails().isTemp(); isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp; - Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), - dataset.getDatasetName(), dataset.getDatasetName()); - ARecordType metaType = dataset.hasMetaPart() - ? (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()) - : null; - String itemTypeName = dataset.getItemTypeName(); - ARecordType itemType = (ARecordType) MetadataManager.INSTANCE - .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype(); - ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, null); - IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset, - itemType, metaType, context.getBinaryComparatorFactoryProvider()); - Pair splitsAndConstraint = getSplitProviderAndConstraints(dataset); - ICcApplicationContext appContext = (ICcApplicationContext) context.getAppContext(); long numElementsHint = getCardinalityPerPartitionHint(dataset); - // TODO // figure out the right behavior of the bulkload and then give the // right callback // (ex. what's the expected behavior when there is an error during // bulkload?) - Pair> compactionInfo = - DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); + IIndexDataflowHelperFactory indexHelperFactory = + new IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), splitsAndConstraint.first); TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = - new TreeIndexBulkLoadOperatorDescriptor(spec, null, appContext.getStorageManager(), - appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, - comparatorFactories, bloomFilterKeyFields, fieldPermutation, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR, - false, numElementsHint, true, dataset.getIndexDataflowHelperFactory(this, primaryIndex, - itemType, metaType, compactionInfo.first, compactionInfo.second), - metadataPageManagerFactory); + new TreeIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation, + GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, indexHelperFactory); return new Pair<>(btreeBulkLoad, splitsAndConstraint.second); } catch (MetadataException me) { throw new AlgebricksException(me); @@ -762,19 +615,19 @@ public class MetadataProvider implements IMetadataProvider public Pair getInsertRuntime( IDataSource dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List keys, LogicalVariable payload, List additionalNonKeyFields, - List additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context, + List additionalNonFilteringFields, RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException { return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, keys, payload, - additionalNonKeyFields, recordDesc, context, spec, bulkload, additionalNonFilteringFields); + additionalNonKeyFields, inputRecordDesc, context, spec, bulkload, additionalNonFilteringFields); } @Override public Pair getDeleteRuntime( IDataSource dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List keys, LogicalVariable payload, List additionalNonKeyFields, - RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException { + RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException { return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, keys, payload, - additionalNonKeyFields, recordDesc, context, spec, false, null); + additionalNonKeyFields, inputRecordDesc, context, spec, false, null); } @Override @@ -947,12 +800,6 @@ public class MetadataProvider implements IMetadataProvider return ClusterStateManager.INSTANCE.getClusterLocations(); } - public Pair splitProviderAndPartitionConstraintsForFilesIndex( - String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException { - return SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints( - findDataset(dataverseName, datasetName), mdTxnCtx, targetIdxName, create); - } - public Pair buildExternalDataLookupRuntime( JobSpecification jobSpec, Dataset dataset, int[] ridIndexes, boolean retainInput, IVariableTypeEnvironment typeEnv, IOperatorSchema opSchema, JobGenContext context, @@ -962,45 +809,26 @@ public class MetadataProvider implements IMetadataProvider ARecordType itemType = (ARecordType) MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype(); - ARecordType metaType = null; - if (dataset.hasMetaPart()) { - metaType = - (ARecordType) MetadataManager.INSTANCE - .getDatatype(metadataProvider.getMetadataTxnContext(), - dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()) - .getDatatype(); - } ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); LookupAdapterFactory adapterFactory = AdapterFactoryProvider.getLookupAdapterFactory( getApplicationContext().getServiceContext(), datasetDetails.getProperties(), itemType, ridIndexes, retainInput, retainMissing, context.getMissingWriterFactory()); - - Pair> compactionInfo; - try { - compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); - } catch (MetadataException e) { - throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e); - } - - String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset); + String fileIndexName = IndexingConstants.getFilesIndexName(dataset.getDatasetName()); + Pair spPc = + metadataProvider.getSplitProviderAndConstraints(dataset, fileIndexName); Index fileIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), fileIndexName); // Create the file index data flow helper - IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this, - fileIndex, itemType, metaType, compactionInfo.first, compactionInfo.second); + IIndexDataflowHelperFactory indexDataflowHelperFactory = + new IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), spPc.first); // Create the out record descriptor, appContext and fileSplitProvider for the files index RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); - ICcApplicationContext appContext = (ICcApplicationContext) context.getAppContext(); - Pair spPc; - spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), - dataset.getDatasetName(), fileIndexName, false); ISearchOperationCallbackFactory searchOpCallbackFactory = dataset .getSearchCallbackFactory(storaegComponentProvider, fileIndex, jobId, IndexOperation.SEARCH, null); // Create the operator ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory, - outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(), - appContext.getStorageManager(), spPc.first, searchOpCallbackFactory, retainMissing, - context.getMissingWriterFactory(), metadataPageManagerFactory); + outRecDesc, indexDataflowHelperFactory, searchOpCallbackFactory, + ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this)); return new Pair<>(op, spPc.second); } catch (Exception e) { throw new AlgebricksException(e); @@ -1021,7 +849,6 @@ public class MetadataProvider implements IMetadataProvider } boolean temp = dataset.getDatasetDetails().isTemp(); isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp; - int numKeys = primaryKeys.size(); int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size(); @@ -1053,17 +880,11 @@ public class MetadataProvider implements IMetadataProvider try { Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName()); - String indexName = primaryIndex.getIndexName(); - String itemTypeName = dataset.getItemTypeName(); String itemTypeDataverseName = dataset.getItemTypeDataverseName(); ARecordType itemType = (ARecordType) MetadataManager.INSTANCE .getDatatype(mdTxnCtx, itemTypeDataverseName, itemTypeName).getDatatype(); ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset); - ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType); - ICcApplicationContext appContext = (ICcApplicationContext) context.getAppContext(); - IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset, - itemType, metaItemType, context.getBinaryComparatorFactoryProvider()); Pair splitsAndConstraint = getSplitProviderAndConstraints(dataset); // prepare callback @@ -1078,19 +899,14 @@ public class MetadataProvider implements IMetadataProvider IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( storaegComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields); - ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( storaegComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields); - - Pair> compactionInfo = - DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); - IIndexDataflowHelperFactory idfh = dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType, - metaItemType, compactionInfo.first, compactionInfo.second); - LSMTreeUpsertOperatorDescriptor op; - + IIndexDataflowHelperFactory idfh = + new IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), splitsAndConstraint.first); + LSMPrimaryUpsertOperatorDescriptor op; ITypeTraits[] outputTypeTraits = new ITypeTraits[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; - ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount() + ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; // add the previous record first @@ -1124,15 +940,10 @@ public class MetadataProvider implements IMetadataProvider outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j]; outputSerDes[j + f] = recordDesc.getFields()[j]; } - RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits); - op = new LSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc, appContext.getStorageManager(), - appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, - comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, null, true, indexName, - context.getMissingWriterFactory(), modificationCallbackFactory, searchCallbackFactory, null, - metadataPageManagerFactory, dataset.getFrameOpCallbackFactory(), hasSecondaries); - op.setType(itemType); - op.setFilterIndex(fieldIdx); + op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh, + context.getMissingWriterFactory(), modificationCallbackFactory, searchCallbackFactory, + dataset.getFrameOpCallbackFactory(), numKeys, itemType, fieldIdx, hasSecondaries); return new Pair<>(op, splitsAndConstraint.second); } catch (MetadataException me) { @@ -1212,7 +1023,7 @@ public class MetadataProvider implements IMetadataProvider private Pair getInsertOrDeleteRuntime(IndexOperation indexOp, IDataSource dataSource, IOperatorSchema propagatedSchema, List keys, - LogicalVariable payload, List additionalNonKeyFields, RecordDescriptor recordDesc, + LogicalVariable payload, List additionalNonKeyFields, RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec, boolean bulkload, List additionalNonFilteringFields) throws AlgebricksException { @@ -1250,15 +1061,6 @@ public class MetadataProvider implements IMetadataProvider try { Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName()); - String indexName = primaryIndex.getIndexName(); - ARecordType itemType = (ARecordType) MetadataManager.INSTANCE - .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype(); - ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset); - ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType); - - ICcApplicationContext appContext = (ICcApplicationContext) context.getAppContext(); - IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset, - itemType, metaItemType, context.getBinaryComparatorFactoryProvider()); Pair splitsAndConstraint = getSplitProviderAndConstraints(dataset); @@ -1269,27 +1071,16 @@ public class MetadataProvider implements IMetadataProvider } IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( storaegComponentProvider, primaryIndex, jobId, indexOp, primaryKeyFields); - ISearchOperationCallbackFactory searchCallbackFactory = dataset - .getSearchCallbackFactory(storaegComponentProvider, primaryIndex, jobId, indexOp, primaryKeyFields); - - Pair> compactionInfo = - DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); - IIndexDataflowHelperFactory idfh = dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType, - metaItemType, compactionInfo.first, compactionInfo.second); + IIndexDataflowHelperFactory idfh = + new IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); - op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), - appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, - comparatorFactories, bloomFilterKeyFields, fieldPermutation, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, - metadataPageManagerFactory); + op = new TreeIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, + GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh); } else { - op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), - appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, - comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp, idfh, null, true, - indexName, null, modificationCallbackFactory, searchCallbackFactory, - metadataPageManagerFactory); + op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh, + null, true, modificationCallbackFactory); } return new Pair<>(op, splitsAndConstraint.second); } catch (MetadataException me) { @@ -1301,9 +1092,10 @@ public class MetadataProvider implements IMetadataProvider IndexOperation indexOp, IDataSourceIndex dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List primaryKeys, List secondaryKeys, - List additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc, - JobGenContext context, JobSpecification spec, boolean bulkload, List prevSecondaryKeys, - LogicalVariable prevAdditionalFilteringKey) throws AlgebricksException { + List additionalNonKeyFields, ILogicalExpression filterExpr, + RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec, boolean bulkload, + List prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey) + throws AlgebricksException { String indexName = dataSourceIndex.getId(); String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName(); String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName(); @@ -1326,18 +1118,18 @@ public class MetadataProvider implements IMetadataProvider switch (secondaryIndex.getIndexType()) { case BTREE: return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys, - secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp, + secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp, bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys); case RTREE: return getRTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys, - secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp, + secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp, bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys); case SINGLE_PARTITION_WORD_INVIX: case SINGLE_PARTITION_NGRAM_INVIX: case LENGTH_PARTITIONED_WORD_INVIX: case LENGTH_PARTITIONED_NGRAM_INVIX: return getInvertedIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys, - secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp, + secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp, secondaryIndex.getIndexType(), bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys); default: throw new AlgebricksException( @@ -1349,7 +1141,7 @@ public class MetadataProvider implements IMetadataProvider private Pair getBTreeRuntime(String dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema, List primaryKeys, List secondaryKeys, List additionalNonKeyFields, - AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context, + AsterixTupleFilterFactory filterFactory, RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp, boolean bulkload, List prevSecondaryKeys, List prevAdditionalFilteringKeys) throws AlgebricksException { Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); @@ -1361,14 +1153,12 @@ public class MetadataProvider implements IMetadataProvider // generate field permutations int[] fieldPermutation = new int[numKeys + numFilterFields]; - int[] bloomFilterKeyFields = new int[secondaryKeys.size()]; int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()]; int i = 0; int j = 0; for (LogicalVariable varKey : secondaryKeys) { int idx = propagatedSchema.findVariable(varKey); fieldPermutation[i] = idx; - bloomFilterKeyFields[i] = i; i++; } for (LogicalVariable varKey : primaryKeys) { @@ -1405,90 +1195,29 @@ public class MetadataProvider implements IMetadataProvider prevFieldPermutation[numKeys] = idx; } } - - String itemTypeName = dataset.getItemTypeName(); - ARecordType itemType; try { - itemType = (ARecordType) MetadataManager.INSTANCE - .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype(); - validateRecordType(itemType); - ARecordType metaType = - dataset.hasMetaPart() - ? (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx, - dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()).getDatatype() - : null; - // Index parameters. Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), indexName); - - ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, itemType); - int[] filterFields; - int[] btreeFields; - if (filterTypeTraits != null) { - filterFields = new int[1]; - filterFields[0] = numKeys; - btreeFields = new int[numKeys]; - for (int k = 0; k < btreeFields.length; k++) { - btreeFields[k] = k; - } - } - - List> secondaryKeyNames = secondaryIndex.getKeyFieldNames(); - List secondaryKeyTypes = secondaryIndex.getKeyFieldTypes(); - ITypeTraits[] typeTraits = new ITypeTraits[numKeys]; - IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys]; - for (i = 0; i < secondaryKeys.size(); ++i) { - Pair keyPairType = - Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i), secondaryKeyNames.get(i), itemType); - IAType keyType = keyPairType.first; - comparatorFactories[i] = - BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); - typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); - } - List> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset); - for (List partitioningKey : partitioningKeys) { - IAType keyType = itemType.getSubFieldType(partitioningKey); - comparatorFactories[i] = - BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); - typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); - ++i; - } - - ICcApplicationContext appContext = (ICcApplicationContext) context.getAppContext(); Pair splitsAndConstraint = getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); - // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); - ISearchOperationCallbackFactory searchOpCallbackFactory = dataset.getSearchCallbackFactory( - storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); - Pair> compactionInfo = - DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); - IIndexDataflowHelperFactory idfh = dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType, - metaType, compactionInfo.first, compactionInfo.second); + IIndexDataflowHelperFactory idfh = + new IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); - op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), - appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, - comparatorFactories, bloomFilterKeyFields, fieldPermutation, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, - metadataPageManagerFactory); + op = new TreeIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, + GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh); } else if (indexOp == IndexOperation.UPSERT) { - op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), - appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, - comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, filterFactory, false, - indexName, null, modificationCallbackFactory, searchOpCallbackFactory, prevFieldPermutation, - metadataPageManagerFactory, dataset.getFrameOpCallbackFactory(), true); + op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, + filterFactory, modificationCallbackFactory, prevFieldPermutation); } else { - op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), - appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, - comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp, idfh, filterFactory, - false, indexName, null, modificationCallbackFactory, searchOpCallbackFactory, - metadataPageManagerFactory); + op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh, + filterFactory, false, modificationCallbackFactory); } return new Pair<>(op, splitsAndConstraint.second); } catch (Exception e) { @@ -1524,8 +1253,6 @@ public class MetadataProvider implements IMetadataProvider int numSecondaryKeys = dimension * 2; int numPrimaryKeys = primaryKeys.size(); int numKeys = numSecondaryKeys + numPrimaryKeys; - ITypeTraits[] typeTraits = new ITypeTraits[numKeys]; - IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys]; int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; int[] fieldPermutation = new int[numKeys + numFilterFields]; @@ -1573,75 +1300,27 @@ public class MetadataProvider implements IMetadataProvider prevFieldPermutation[numKeys] = idx; } } - - IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag()); - IPrimitiveValueProviderFactory[] valueProviderFactories = - new IPrimitiveValueProviderFactory[numSecondaryKeys]; - for (i = 0; i < numSecondaryKeys; i++) { - comparatorFactories[i] = - BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType, true); - typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType); - valueProviderFactories[i] = primitiveValueProviderFactory; - } - List> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset); - for (List partitioningKey : partitioningKeys) { - IAType keyType = recType.getSubFieldType(partitioningKey); - typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); - ++i; - } - ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset); - IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories( - dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider()); - ICcApplicationContext appContext = (ICcApplicationContext) context.getAppContext(); Pair splitsAndConstraint = getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); - int[] btreeFields = new int[primaryComparatorFactories.length]; - for (int k = 0; k < btreeFields.length; k++) { - btreeFields[k] = k + numSecondaryKeys; - } - - ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, recType); - int[] filterFields; - int[] rtreeFields; - if (filterTypeTraits != null) { - filterFields = new int[1]; - filterFields[0] = numSecondaryKeys + numPrimaryKeys; - rtreeFields = new int[numSecondaryKeys + numPrimaryKeys]; - for (int k = 0; k < rtreeFields.length; k++) { - rtreeFields[k] = k; - } - } // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); - ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( - storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); - Pair> compactionInfo = - DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); - IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this, - secondaryIndex, recType, metaItemType, compactionInfo.first, compactionInfo.second); + IIndexDataflowHelperFactory indexDataflowHelperFactory = + new IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); - op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), - appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, - primaryComparatorFactories, btreeFields, fieldPermutation, + op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, - indexDataflowHelperFactory, metadataPageManagerFactory); + indexDataflowHelperFactory); } else if (indexOp == IndexOperation.UPSERT) { - op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), - appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, - comparatorFactories, null, fieldPermutation, indexDataflowHelperFactory, filterFactory, false, - indexName, null, modificationCallbackFactory, searchCallbackFactory, prevFieldPermutation, - metadataPageManagerFactory, dataset.getFrameOpCallbackFactory(), true); + op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, + indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, prevFieldPermutation); } else { - op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), - appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, - comparatorFactories, null, fieldPermutation, indexOp, indexDataflowHelperFactory, filterFactory, - false, indexName, null, modificationCallbackFactory, searchCallbackFactory, - metadataPageManagerFactory); + op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp, + indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory); } return new Pair<>(op, splitsAndConstraint.second); } catch (MetadataException e) { @@ -1733,87 +1412,10 @@ public class MetadataProvider implements IMetadataProvider prevFieldPermutation[numKeys] = idx; } } - - String itemTypeName = dataset.getItemTypeName(); - IAType itemType; try { - itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName) - .getDatatype(); - validateRecordType(itemType); - ARecordType recType = (ARecordType) itemType; - // Index parameters. Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), indexName); - - List> secondaryKeyExprs = secondaryIndex.getKeyFieldNames(); - List secondaryKeyTypes = secondaryIndex.getKeyFieldTypes(); - - int numTokenFields = 0; - - // SecondaryKeys.size() can be two if it comes from the bulkload. - // In this case, [token, number of token] are the secondaryKeys. - if (!isPartitioned || (secondaryKeys.size() > 1)) { - numTokenFields = secondaryKeys.size(); - } else if (isPartitioned && (secondaryKeys.size() == 1)) { - numTokenFields = secondaryKeys.size() + 1; - } - - ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset); - ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields]; - ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()]; - IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields]; - IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories( - dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider()); - - IAType secondaryKeyType; - Pair keyPairType = - Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyExprs.get(0), recType); - secondaryKeyType = keyPairType.first; - - List> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset); - - i = 0; - for (List partitioningKey : partitioningKeys) { - IAType keyType = recType.getSubFieldType(partitioningKey); - invListsTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); - ++i; - } - - tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType); - tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType); - if (isPartitioned) { - // The partitioning field is hardcoded to be a short *without* - // an Asterix type tag. - tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY); - tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS; - } - IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory( - secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength()); - - ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, recType); - - int[] filterFields; - int[] invertedIndexFields; - int[] filterFieldsForNonBulkLoadOps; - int[] invertedIndexFieldsForNonBulkLoadOps; - if (filterTypeTraits != null) { - filterFields = new int[1]; - filterFields[0] = numTokenFields + primaryKeys.size(); - invertedIndexFields = new int[numTokenFields + primaryKeys.size()]; - for (int k = 0; k < invertedIndexFields.length; k++) { - invertedIndexFields[k] = k; - } - filterFieldsForNonBulkLoadOps = new int[numFilterFields]; - //for non-bulk-loads, there is only in the incoming tuples - filterFieldsForNonBulkLoadOps[0] = numKeys; - invertedIndexFieldsForNonBulkLoadOps = new int[numKeys]; - for (int k = 0; k < invertedIndexFieldsForNonBulkLoadOps.length; k++) { - invertedIndexFieldsForNonBulkLoadOps[k] = k; - } - } - - ICcApplicationContext appContext = (ICcApplicationContext) context.getAppContext(); Pair splitsAndConstraint = getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); @@ -1821,33 +1423,19 @@ public class MetadataProvider implements IMetadataProvider JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); - ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( - storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); - Pair> compactionInfo = - DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); - IIndexDataflowHelperFactory indexDataFlowFactory = dataset.getIndexDataflowHelperFactory(this, - secondaryIndex, recType, metaItemType, compactionInfo.first, compactionInfo.second); + IIndexDataflowHelperFactory indexDataFlowFactory = + new IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); - op = new LSMInvertedIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, false, - numElementsHint, false, appContext.getStorageManager(), splitsAndConstraint.first, - appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories, - invListsTypeTraits, invListComparatorFactories, tokenizerFactory, indexDataFlowFactory, - metadataPageManagerFactory); + op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, + GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory); } else if (indexOp == IndexOperation.UPSERT) { - op = new LSMInvertedIndexUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), - splitsAndConstraint.first, appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, - tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory, - fieldPermutation, indexDataFlowFactory, filterFactory, modificationCallbackFactory, - searchCallbackFactory, indexName, prevFieldPermutation, metadataPageManagerFactory); + op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory, + filterFactory, modificationCallbackFactory, prevFieldPermutation); } else { - op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc, - appContext.getStorageManager(), splitsAndConstraint.first, - appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories, - invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp, - indexDataFlowFactory, filterFactory, modificationCallbackFactory, searchCallbackFactory, - indexName, metadataPageManagerFactory); + op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp, + indexDataFlowFactory, filterFactory, false, modificationCallbackFactory); } return new Pair<>(op, splitsAndConstraint.second); } catch (Exception e) { @@ -1973,7 +1561,7 @@ public class MetadataProvider implements IMetadataProvider Pair keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypeEntries.get(0), secondaryKeyExprs.get(0), recType); secondaryKeyType = keyPairType.first; - List> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset); + List> partitioningKeys = dataset.getPrimaryKeys(); i = 0; for (List partitioningKey : partitioningKeys) { IAType keyType = recType.getSubFieldType(partitioningKey);