Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A11F418C7A for ; Tue, 25 Aug 2015 16:44:03 +0000 (UTC) Received: (qmail 77767 invoked by uid 500); 25 Aug 2015 16:44:03 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 77734 invoked by uid 500); 25 Aug 2015 16:44:03 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 77681 invoked by uid 99); 25 Aug 2015 16:44:03 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Aug 2015 16:44:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id E6DA6EDCAF for ; Tue, 25 Aug 2015 16:44:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.442 X-Spam-Level: X-Spam-Status: No, score=0.442 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.338] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id FLiImLlYz1wY for ; Tue, 25 Aug 2015 16:43:57 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 606B2261F2 for ; Tue, 25 Aug 2015 16:43:50 +0000 (UTC) Received: (qmail 75795 invoked by uid 99); 25 Aug 2015 16:43:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Aug 2015 16:43:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D16D4E01F5; Tue, 25 Aug 2015 16:43:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: imaxon@apache.org To: commits@asterixdb.incubator.apache.org Date: Tue, 25 Aug 2015 16:44:02 -0000 Message-Id: <0e24fcff7e5348b78c86bfd73755c2c4@git.apache.org> In-Reply-To: <82235e2314bc4ce9bc242ea1878b4eae@git.apache.org> References: <82235e2314bc4ce9bc242ea1878b4eae@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/51] [partial] incubator-asterixdb git commit: Change folder structure for Java repackage http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeOperationsHelper.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeOperationsHelper.java deleted file mode 100644 index ffecc8e..0000000 --- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeOperationsHelper.java +++ /dev/null @@ -1,363 +0,0 @@ -/* - * Copyright 2009-2013 by The Regents of the University of California - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * you may obtain a copy of the License from - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.uci.ics.asterix.file; - -import java.util.List; - -import edu.uci.ics.asterix.common.api.ILocalResourceMetadata; -import edu.uci.ics.asterix.common.config.AsterixStorageProperties; -import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType; -import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType; -import edu.uci.ics.asterix.common.config.GlobalConfig; -import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider; -import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider; -import edu.uci.ics.asterix.common.exceptions.AsterixException; -import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory; -import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider; -import edu.uci.ics.asterix.metadata.entities.Index; -import edu.uci.ics.asterix.metadata.external.IndexingConstants; -import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor; -import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry; -import edu.uci.ics.asterix.om.types.IAType; -import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider; -import edu.uci.ics.asterix.transaction.management.resource.ExternalBTreeWithBuddyLocalResourceMetadata; -import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata; -import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; -import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider; -import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; -import edu.uci.ics.hyracks.algebricks.common.utils.Pair; -import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; -import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; -import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; -import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider; -import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider; -import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; -import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory; -import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory; -import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; -import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor; -import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer; -import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits; -import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; -import edu.uci.ics.hyracks.api.job.JobSpecification; -import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor; -import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; -import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory; -import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory; -import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor; -import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider; -import edu.uci.ics.hyracks.storage.common.file.LocalResource; - -public class SecondaryBTreeOperationsHelper extends SecondaryIndexOperationsHelper { - - protected SecondaryBTreeOperationsHelper(PhysicalOptimizationConfig physOptConf, - IAsterixPropertiesProvider propertiesProvider) { - super(physOptConf, propertiesProvider); - } - - @Override - public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties(); - ILocalResourceFactoryProvider localResourceFactoryProvider; - IIndexDataflowHelperFactory indexDataflowHelperFactory; - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - //prepare a LocalResourceMetadata which will be stored in NC's local resource repository - ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(secondaryTypeTraits, - secondaryComparatorFactories, secondaryBloomFilterKeyFields, true, dataset.getDatasetId(), - mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits, filterCmpFactories, - secondaryBTreeFields, secondaryFilterFields); - localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata, - LocalResource.LSMBTreeResource); - // The index create operation should be persistent regardless of temp datasets or permanent dataset. - indexDataflowHelperFactory = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider( - dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories, - secondaryBTreeFields, secondaryFilterFields, true); - } else { - // External dataset local resource and dataflow helper - int[] buddyBreeFields = new int[] { numSecondaryKeys }; - ILocalResourceMetadata localResourceMetadata = new ExternalBTreeWithBuddyLocalResourceMetadata( - dataset.getDatasetId(), secondaryComparatorFactories, secondaryTypeTraits, mergePolicyFactory, - mergePolicyFactoryProperties, buddyBreeFields); - localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata, - LocalResource.ExternalBTreeWithBuddyResource); - indexDataflowHelperFactory = new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, - mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), buddyBreeFields, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true); - } - TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - secondaryFileSplitProvider, secondaryTypeTraits, secondaryComparatorFactories, - secondaryBloomFilterKeyFields, indexDataflowHelperFactory, localResourceFactoryProvider, - NoOpOperationCallbackFactory.INSTANCE); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp, - secondaryPartitionConstraint); - spec.addRoot(secondaryIndexCreateOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - @Override - public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - /* - * In case of external data, this method is used to build loading jobs for both initial load on index creation - * and transaction load on dataset referesh - */ - - // Create external indexing scan operator - ExternalDataScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec); - - // Assign op. - AbstractOperatorDescriptor sourceOp = primaryScanOp; - if (isEnforcingKeyTypes) { - sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType()); - spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); - } - AlgebricksMetaOperatorDescriptor asterixAssignOp = createExternalAssignOp(spec, numSecondaryKeys); - - // If any of the secondary fields are nullable, then add a select op that filters nulls. - AlgebricksMetaOperatorDescriptor selectOp = null; - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys); - } - - // Sort by secondary keys. - ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc); - - AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties(); - // Create secondary BTree bulk load op. - AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp; - ExternalBTreeWithBuddyDataflowHelperFactory dataflowHelperFactory = new ExternalBTreeWithBuddyDataflowHelperFactory( - mergePolicyFactory, mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider( - dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), new int[] { numSecondaryKeys }, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true); - IOperatorDescriptor root; - if (externalFiles != null) { - // Transaction load - secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, numSecondaryKeys, dataflowHelperFactory, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR); - root = secondaryBulkLoadOp; - } else { - // Initial load - secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, numSecondaryKeys, dataflowHelperFactory, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR); - AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, - new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, - new RecordDescriptor[] { secondaryRecDesc }); - spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0); - root = metaOp; - } - spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0); - } else { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0); - } - spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0); - spec.addRoot(root); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } else { - // Create dummy key provider for feeding the primary index scan. - AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec); - - // Create primary index scan op. - BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec); - - // Assign op. - AbstractOperatorDescriptor sourceOp = primaryScanOp; - if (isEnforcingKeyTypes) { - sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType()); - spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); - } - AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, numSecondaryKeys); - - // If any of the secondary fields are nullable, then add a select op that filters nulls. - AlgebricksMetaOperatorDescriptor selectOp = null; - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys); - } - - // Sort by secondary keys. - ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc); - - AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties(); - boolean temp = dataset.getDatasetDetails().isTemp(); - // Create secondary BTree bulk load op. - TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp( - spec, - numSecondaryKeys, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties - .getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories, - secondaryBTreeFields, secondaryFilterFields, !temp), GlobalConfig.DEFAULT_TREE_FILL_FACTOR); - - AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, - new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] { secondaryRecDesc }); - // Connect the operators. - spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0); - } else { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0); - } - spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0); - spec.addRoot(metaOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - } - - @Override - protected int getNumSecondaryKeys() { - return numSecondaryKeys; - } - - @Override - public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties(); - boolean temp = dataset.getDatasetDetails().isTemp(); - LSMTreeIndexCompactOperatorDescriptor compactOp; - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits, - secondaryComparatorFactories, secondaryBloomFilterKeyFields, new LSMBTreeDataflowHelperFactory( - new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory, - mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider( - dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, - filterCmpFactories, secondaryBTreeFields, secondaryFilterFields, !temp), - NoOpOperationCallbackFactory.INSTANCE); - } else { - // External dataset - compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits, - secondaryComparatorFactories, secondaryBloomFilterKeyFields, - new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, storageProperties - .getBloomFilterFalsePositiveRate(), new int[] { numSecondaryKeys }, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true), - NoOpOperationCallbackFactory.INSTANCE); - } - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, - secondaryPartitionConstraint); - spec.addRoot(compactOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - @Override - @SuppressWarnings("rawtypes") - protected void setSecondaryRecDescAndComparators(IndexType indexType, List> secondaryKeyFields, - List secondaryKeyTypes, int gramLength, AqlMetadataProvider metadataProvider) - throws AlgebricksException, AsterixException { - secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys + numFilterFields]; - secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys]; - secondaryBloomFilterKeyFields = new int[numSecondaryKeys]; - ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys - + numFilterFields]; - ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields]; - secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys]; - ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys]; - ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider(); - ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider(); - IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat() - .getBinaryComparatorFactoryProvider(); - // Record column is 0 for external datasets, numPrimaryKeys for internal ones - int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? numPrimaryKeys : 0; - for (int i = 0; i < numSecondaryKeys; i++) { - secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory( - isEnforcingKeyTypes ? enforcedItemType : itemType, secondaryKeyFields.get(i), recordColumn); - Pair keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i), - secondaryKeyFields.get(i), itemType); - IAType keyType = keyTypePair.first; - anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second; - ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType); - secondaryRecFields[i] = keySerde; - secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true); - secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType); - secondaryBloomFilterKeyFields[i] = i; - } - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - // Add serializers and comparators for primary index fields. - for (int i = 0; i < numPrimaryKeys; i++) { - secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i]; - enforcedRecFields[i] = primaryRecDesc.getFields()[i]; - secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i]; - enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i]; - secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i]; - } - } else { - // Add serializers and comparators for RID fields. - for (int i = 0; i < numPrimaryKeys; i++) { - secondaryRecFields[numSecondaryKeys + i] = IndexingConstants.getSerializerDeserializer(i); - enforcedRecFields[i] = IndexingConstants.getSerializerDeserializer(i); - secondaryTypeTraits[numSecondaryKeys + i] = IndexingConstants.getTypeTraits(i); - enforcedTypeTraits[i] = IndexingConstants.getTypeTraits(i); - secondaryComparatorFactories[numSecondaryKeys + i] = IndexingConstants.getComparatorFactory(i); - } - } - enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType); - - if (numFilterFields > 0) { - secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat() - .getFieldAccessEvaluatorFactory(itemType, filterFieldName, numPrimaryKeys); - Pair keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType); - IAType type = keyTypePair.first; - ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type); - secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde; - } - - secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits); - enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexOperationsHelper.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexOperationsHelper.java deleted file mode 100644 index 07c8bab..0000000 --- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexOperationsHelper.java +++ /dev/null @@ -1,575 +0,0 @@ -/* - * Copyright 2009-2013 by The Regents of the University of California - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * you may obtain a copy of the License from - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package edu.uci.ics.asterix.file; - -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import edu.uci.ics.asterix.common.config.AsterixStorageProperties; -import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType; -import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp; -import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType; -import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider; -import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider; -import edu.uci.ics.asterix.common.context.ITransactionSubsystemProvider; -import edu.uci.ics.asterix.common.context.TransactionSubsystemProvider; -import edu.uci.ics.asterix.common.exceptions.AsterixException; -import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType; -import edu.uci.ics.asterix.common.transactions.JobId; -import edu.uci.ics.asterix.external.indexing.operators.ExternalIndexBulkModifyOperatorDescriptor; -import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl; -import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider; -import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider; -import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider; -import edu.uci.ics.asterix.metadata.MetadataException; -import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider; -import edu.uci.ics.asterix.metadata.entities.Dataset; -import edu.uci.ics.asterix.metadata.entities.ExternalFile; -import edu.uci.ics.asterix.metadata.external.IndexingConstants; -import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor; -import edu.uci.ics.asterix.metadata.utils.DatasetUtils; -import edu.uci.ics.asterix.om.types.ARecordType; -import edu.uci.ics.asterix.om.types.IAType; -import edu.uci.ics.asterix.om.util.AsterixAppContextInfo; -import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor; -import edu.uci.ics.asterix.runtime.evaluators.functions.CastRecordDescriptor; -import edu.uci.ics.asterix.runtime.evaluators.functions.IsNullDescriptor; -import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor; -import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory; -import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; -import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider; -import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider; -import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory; -import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; -import edu.uci.ics.hyracks.algebricks.common.utils.Pair; -import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter; -import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; -import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider; -import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; -import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory; -import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory; -import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; -import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory; -import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory; -import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer; -import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits; -import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; -import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; -import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory; -import edu.uci.ics.hyracks.api.job.JobSpecification; -import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; -import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor; -import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider; -import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor; -import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; -import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; -import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory; -import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; - -@SuppressWarnings("rawtypes") -// TODO: We should eventually have a hierarchy of classes that can create all -// possible index job specs, -// not just for creation. -public abstract class SecondaryIndexOperationsHelper { - protected final PhysicalOptimizationConfig physOptConf; - - protected int numPrimaryKeys; - protected int numSecondaryKeys; - protected AqlMetadataProvider metadataProvider; - protected String dataverseName; - protected String datasetName; - protected Dataset dataset; - protected ARecordType itemType; - protected ISerializerDeserializer payloadSerde; - protected IFileSplitProvider primaryFileSplitProvider; - protected AlgebricksPartitionConstraint primaryPartitionConstraint; - protected IFileSplitProvider secondaryFileSplitProvider; - protected AlgebricksPartitionConstraint secondaryPartitionConstraint; - protected String secondaryIndexName; - protected boolean anySecondaryKeyIsNullable = false; - protected boolean isEnforcingKeyTypes = false; - - protected long numElementsHint; - protected IBinaryComparatorFactory[] primaryComparatorFactories; - protected int[] primaryBloomFilterKeyFields; - protected RecordDescriptor primaryRecDesc; - protected IBinaryComparatorFactory[] secondaryComparatorFactories; - protected ITypeTraits[] secondaryTypeTraits; - protected int[] secondaryBloomFilterKeyFields; - protected RecordDescriptor secondaryRecDesc; - protected ICopyEvaluatorFactory[] secondaryFieldAccessEvalFactories; - - protected IAsterixPropertiesProvider propertiesProvider; - protected ILSMMergePolicyFactory mergePolicyFactory; - protected Map mergePolicyFactoryProperties; - protected RecordDescriptor enforcedRecDesc; - protected ARecordType enforcedItemType; - - protected int numFilterFields; - protected List filterFieldName; - protected ITypeTraits[] filterTypeTraits; - protected IBinaryComparatorFactory[] filterCmpFactories; - protected int[] secondaryFilterFields; - protected int[] primaryFilterFields; - protected int[] primaryBTreeFields; - protected int[] secondaryBTreeFields; - protected List externalFiles; - - // Prevent public construction. Should be created via createIndexCreator(). - protected SecondaryIndexOperationsHelper(PhysicalOptimizationConfig physOptConf, - IAsterixPropertiesProvider propertiesProvider) { - this.physOptConf = physOptConf; - this.propertiesProvider = propertiesProvider; - } - - public static SecondaryIndexOperationsHelper createIndexOperationsHelper(IndexType indexType, String dataverseName, - String datasetName, String indexName, List> secondaryKeyFields, - List secondaryKeyTypes, boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider, - PhysicalOptimizationConfig physOptConf, ARecordType recType, ARecordType enforcedType) - throws AsterixException, AlgebricksException { - IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance(); - SecondaryIndexOperationsHelper indexOperationsHelper = null; - switch (indexType) { - case BTREE: { - indexOperationsHelper = new SecondaryBTreeOperationsHelper(physOptConf, asterixPropertiesProvider); - break; - } - case RTREE: { - indexOperationsHelper = new SecondaryRTreeOperationsHelper(physOptConf, asterixPropertiesProvider); - break; - } - case SINGLE_PARTITION_WORD_INVIX: - case SINGLE_PARTITION_NGRAM_INVIX: - case LENGTH_PARTITIONED_WORD_INVIX: - case LENGTH_PARTITIONED_NGRAM_INVIX: { - indexOperationsHelper = new SecondaryInvertedIndexOperationsHelper(physOptConf, - asterixPropertiesProvider); - break; - } - default: { - throw new AsterixException("Unknown Index Type: " + indexType); - } - } - indexOperationsHelper.init(indexType, dataverseName, datasetName, indexName, secondaryKeyFields, - secondaryKeyTypes, isEnforced, gramLength, metadataProvider, recType, enforcedType); - return indexOperationsHelper; - } - - public abstract JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException; - - public abstract JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException; - - public abstract JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException; - - protected void init(IndexType indexType, String dvn, String dsn, String in, List> secondaryKeyFields, - List secondaryKeyTypes, boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider, - ARecordType aRecType, ARecordType enforcedType) throws AsterixException, AlgebricksException { - this.metadataProvider = metadataProvider; - dataverseName = dvn == null ? metadataProvider.getDefaultDataverseName() : dvn; - datasetName = dsn; - secondaryIndexName = in; - isEnforcingKeyTypes = isEnforced; - dataset = metadataProvider.findDataset(dataverseName, datasetName); - if (dataset == null) { - throw new AsterixException("Unknown dataset " + datasetName); - } - boolean temp = dataset.getDatasetDetails().isTemp(); - itemType = aRecType; - enforcedItemType = enforcedType; - payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType); - numSecondaryKeys = secondaryKeyFields.size(); - Pair secondarySplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, secondaryIndexName, temp); - secondaryFileSplitProvider = secondarySplitsAndConstraint.first; - secondaryPartitionConstraint = secondarySplitsAndConstraint.second; - - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - numPrimaryKeys = ExternalIndexingOperations.getRIDSize(dataset); - } else { - filterFieldName = DatasetUtils.getFilterField(dataset); - if (filterFieldName != null) { - numFilterFields = 1; - } else { - numFilterFields = 0; - } - - numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size(); - Pair primarySplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp); - primaryFileSplitProvider = primarySplitsAndConstraint.first; - primaryPartitionConstraint = primarySplitsAndConstraint.second; - setPrimaryRecDescAndComparators(); - } - setSecondaryRecDescAndComparators(indexType, secondaryKeyFields, secondaryKeyTypes, gramLength, - metadataProvider); - numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset); - Pair> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, - metadataProvider.getMetadataTxnContext()); - mergePolicyFactory = compactionInfo.first; - mergePolicyFactoryProperties = compactionInfo.second; - - if (numFilterFields > 0) { - setFilterTypeTraitsAndComparators(); - } - } - - protected void setFilterTypeTraitsAndComparators() throws AlgebricksException { - filterTypeTraits = new ITypeTraits[numFilterFields]; - filterCmpFactories = new IBinaryComparatorFactory[numFilterFields]; - secondaryFilterFields = new int[numFilterFields]; - primaryFilterFields = new int[numFilterFields]; - primaryBTreeFields = new int[numPrimaryKeys + 1]; - secondaryBTreeFields = new int[numSecondaryKeys + numPrimaryKeys]; - for (int i = 0; i < primaryBTreeFields.length; i++) { - primaryBTreeFields[i] = i; - } - for (int i = 0; i < secondaryBTreeFields.length; i++) { - secondaryBTreeFields[i] = i; - } - - IAType type; - try { - type = itemType.getSubFieldType(filterFieldName); - } catch (IOException e) { - throw new AlgebricksException(e); - } - filterCmpFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(type, true); - filterTypeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(type); - secondaryFilterFields[0] = getNumSecondaryKeys() + numPrimaryKeys; - primaryFilterFields[0] = numPrimaryKeys + 1; - } - - protected abstract int getNumSecondaryKeys(); - - protected void setPrimaryRecDescAndComparators() throws AlgebricksException { - List> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset); - int numPrimaryKeys = partitioningKeys.size(); - ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1]; - ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1]; - primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys]; - primaryBloomFilterKeyFields = new int[numPrimaryKeys]; - ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider(); - for (int i = 0; i < numPrimaryKeys; i++) { - IAType keyType; - try { - keyType = itemType.getSubFieldType(partitioningKeys.get(i)); - } catch (IOException e) { - throw new AlgebricksException(e); - } - primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType); - primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory( - keyType, true); - primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType); - primaryBloomFilterKeyFields[i] = i; - } - primaryRecFields[numPrimaryKeys] = payloadSerde; - primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType); - primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits); - } - - protected abstract void setSecondaryRecDescAndComparators(IndexType indexType, - List> secondaryKeyFields, List secondaryKeyTypes, int gramLength, - AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException; - - protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AsterixException, - AlgebricksException { - // Build dummy tuple containing one field with a dummy value inside. - ArrayTupleBuilder tb = new ArrayTupleBuilder(1); - DataOutput dos = tb.getDataOutput(); - tb.reset(); - try { - // Serialize dummy value into a field. - IntegerSerializerDeserializer.INSTANCE.serialize(0, dos); - } catch (HyracksDataException e) { - throw new AsterixException(e); - } - // Add dummy field. - tb.addFieldEndOffset(); - ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE }; - RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers); - ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec, - keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp, - primaryPartitionConstraint); - return keyProviderOp; - } - - protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException { - // -Infinity - int[] lowKeyFields = null; - // +Infinity - int[] highKeyFields = null; - ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider(); - JobId jobId = JobIdFactory.generateJobId(); - metadataProvider.setJobId(jobId); - boolean isWriteTransaction = metadataProvider.isWriteTransaction(); - IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction); - spec.setJobletEventListenerFactory(jobEventListenerFactory); - - boolean temp = dataset.getDatasetDetails().isTemp(); - ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE - : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(), - primaryBloomFilterKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE); - AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties(); - BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories, - primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - mergePolicyFactory, mergePolicyFactoryProperties, new PrimaryIndexOperationTrackerProvider( - dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties - .getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories, - primaryBTreeFields, primaryFilterFields, !temp), false, false, null, - searchCallbackFactory, null, null); - - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp, - primaryPartitionConstraint); - return primarySearchOp; - } - - protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec, - AbstractOperatorDescriptor primaryScanOp, int numSecondaryKeyFields) throws AlgebricksException { - int[] outColumns = new int[numSecondaryKeyFields + numFilterFields]; - int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields]; - for (int i = 0; i < numSecondaryKeyFields + numFilterFields; i++) { - outColumns[i] = numPrimaryKeys + i; - } - int projCount = 0; - for (int i = 0; i < numSecondaryKeyFields; i++) { - projectionList[projCount++] = numPrimaryKeys + i; - } - for (int i = 0; i < numPrimaryKeys; i++) { - projectionList[projCount++] = i; - } - if (numFilterFields > 0) { - projectionList[projCount++] = numPrimaryKeys + numSecondaryKeyFields; - } - - IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length]; - for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) { - sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter( - secondaryFieldAccessEvalFactories[i]); - } - AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList); - AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1, - new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc }); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp, - primaryPartitionConstraint); - return asterixAssignOp; - } - - protected AlgebricksMetaOperatorDescriptor createCastOp(JobSpecification spec, - AbstractOperatorDescriptor primaryScanOp, int numSecondaryKeyFields, DatasetType dsType) { - CastRecordDescriptor castFuncDesc = (CastRecordDescriptor) CastRecordDescriptor.FACTORY - .createFunctionDescriptor(); - castFuncDesc.reset(enforcedItemType, itemType); - - int[] outColumns = new int[1]; - int[] projectionList = new int[1 + numPrimaryKeys]; - int recordIdx; - //external datascan operator returns a record as the first field, instead of the last in internal case - if (dsType == DatasetType.EXTERNAL) { - recordIdx = 0; - outColumns[0] = 0; - } else { - recordIdx = numPrimaryKeys; - outColumns[0] = numPrimaryKeys; - } - for (int i = 0; i <= numPrimaryKeys; i++) { - projectionList[i] = i; - } - ICopyEvaluatorFactory[] castEvalFact = new ICopyEvaluatorFactory[] { new ColumnAccessEvalFactory(recordIdx) }; - IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[1]; - sefs[0] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter( - castFuncDesc.createEvaluatorFactory(castEvalFact)); - AssignRuntimeFactory castAssign = new AssignRuntimeFactory(outColumns, sefs, projectionList); - AlgebricksMetaOperatorDescriptor castRecAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1, - new IPushRuntimeFactory[] { castAssign }, new RecordDescriptor[] { enforcedRecDesc }); - - return castRecAssignOp; - } - - protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec, - IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) { - int[] sortFields = new int[secondaryComparatorFactories.length]; - for (int i = 0; i < secondaryComparatorFactories.length; i++) { - sortFields[i] = i; - } - ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec, - physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint); - return sortOp; - } - - protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec, - int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) - throws MetadataException, AlgebricksException { - int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields]; - for (int i = 0; i < fieldPermutation.length; i++) { - fieldPermutation[i] = i; - } - TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec, - secondaryRecDesc, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, - secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields, - fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, - secondaryPartitionConstraint); - return treeIndexBulkLoadOp; - } - - public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields) - throws AlgebricksException { - ICopyEvaluatorFactory[] andArgsEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeyFields]; - NotDescriptor notDesc = new NotDescriptor(); - IsNullDescriptor isNullDesc = new IsNullDescriptor(); - for (int i = 0; i < numSecondaryKeyFields; i++) { - // Access column i, and apply 'is not null'. - ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i); - ICopyEvaluatorFactory isNullEvalFactory = isNullDesc - .createEvaluatorFactory(new ICopyEvaluatorFactory[] { columnAccessEvalFactory }); - ICopyEvaluatorFactory notEvalFactory = notDesc - .createEvaluatorFactory(new ICopyEvaluatorFactory[] { isNullEvalFactory }); - andArgsEvalFactories[i] = notEvalFactory; - } - ICopyEvaluatorFactory selectCond = null; - if (numSecondaryKeyFields > 1) { - // Create conjunctive condition where all secondary index keys must - // satisfy 'is not null'. - AndDescriptor andDesc = new AndDescriptor(); - selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories); - } else { - selectCond = andArgsEvalFactories[0]; - } - StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory( - new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(selectCond), - null, AqlBinaryBooleanInspectorImpl.FACTORY, false, -1, null); - AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1, - new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { secondaryRecDesc }); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixSelectOp, - primaryPartitionConstraint); - return asterixSelectOp; - } - - // This method creates a source indexing operator for external data - protected ExternalDataScanOperatorDescriptor createExternalIndexingOp(JobSpecification spec) - throws AlgebricksException, AsterixException { - // A record + primary keys - ISerializerDeserializer[] serdes = new ISerializerDeserializer[1 + numPrimaryKeys]; - ITypeTraits[] typeTraits = new ITypeTraits[1 + numPrimaryKeys]; - // payload serde and type traits for the record slot - serdes[0] = payloadSerde; - typeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType); - // serdes and type traits for rid fields - for (int i = 1; i < serdes.length; i++) { - serdes[i] = IndexingConstants.getSerializerDeserializer(i - 1); - typeTraits[i] = IndexingConstants.getTypeTraits(i - 1); - } - // output record desc - RecordDescriptor indexerDesc = new RecordDescriptor(serdes, typeTraits); - - // Create the operator and its partition constraits - Pair indexingOpAndConstraints; - try { - indexingOpAndConstraints = ExternalIndexingOperations.createExternalIndexingOp(spec, metadataProvider, - dataset, itemType, indexerDesc, externalFiles); - } catch (Exception e) { - throw new AlgebricksException(e); - } - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first, - indexingOpAndConstraints.second); - - // Set the primary partition constraints to this partition constraints - primaryPartitionConstraint = indexingOpAndConstraints.second; - return indexingOpAndConstraints.first; - } - - protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec, int numSecondaryKeys) - throws AlgebricksException { - int[] outColumns = new int[numSecondaryKeys]; - int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys]; - for (int i = 0; i < numSecondaryKeys; i++) { - outColumns[i] = i + numPrimaryKeys + 1; - projectionList[i] = i + numPrimaryKeys + 1; - } - - IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length]; - for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) { - sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter( - secondaryFieldAccessEvalFactories[i]); - } - //add External RIDs to the projection list - for (int i = 0; i < numPrimaryKeys; i++) { - projectionList[numSecondaryKeys + i] = i + 1; - } - - AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList); - AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1, - new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc }); - return asterixAssignOp; - } - - protected ExternalIndexBulkModifyOperatorDescriptor createExternalIndexBulkModifyOp(JobSpecification spec, - int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) - throws MetadataException, AlgebricksException { - int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys]; - for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) { - fieldPermutation[i] = i; - } - // create a list of file ids - int numOfDeletedFiles = 0; - for (ExternalFile file : externalFiles) { - if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) - numOfDeletedFiles++; - } - int[] deletedFiles = new int[numOfDeletedFiles]; - int i = 0; - for (ExternalFile file : externalFiles) { - if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) { - deletedFiles[i] = file.getFileNumber(); - } - } - ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new ExternalIndexBulkModifyOperatorDescriptor( - spec, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits, - secondaryComparatorFactories, secondaryBloomFilterKeyFields, dataflowHelperFactory, - NoOpOperationCallbackFactory.INSTANCE, deletedFiles, fieldPermutation, fillFactor, numElementsHint); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, - secondaryPartitionConstraint); - return treeIndexBulkLoadOp; - } - - public List getExternalFiles() { - return externalFiles; - } - - public void setExternalFiles(List externalFiles) { - this.externalFiles = externalFiles; - } -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexOperationsHelper.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexOperationsHelper.java deleted file mode 100644 index 74c4256..0000000 --- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexOperationsHelper.java +++ /dev/null @@ -1,373 +0,0 @@ -/* - * Copyright 2009-2013 by The Regents of the University of California - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * you may obtain a copy of the License from - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.uci.ics.asterix.file; - -import java.util.List; - -import edu.uci.ics.asterix.common.api.ILocalResourceMetadata; -import edu.uci.ics.asterix.common.config.AsterixStorageProperties; -import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType; -import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider; -import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider; -import edu.uci.ics.asterix.common.exceptions.AsterixException; -import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory; -import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider; -import edu.uci.ics.asterix.metadata.entities.Index; -import edu.uci.ics.asterix.om.types.IAType; -import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil; -import edu.uci.ics.asterix.runtime.formats.FormatUtils; -import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider; -import edu.uci.ics.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadata; -import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; -import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider; -import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; -import edu.uci.ics.hyracks.algebricks.common.utils.Pair; -import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; -import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; -import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider; -import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider; -import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; -import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory; -import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory; -import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; -import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer; -import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits; -import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; -import edu.uci.ics.hyracks.api.job.JobSpecification; -import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory; -import edu.uci.ics.hyracks.data.std.primitive.ShortPointable; -import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer; -import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor; -import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; -import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCompactOperator; -import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor; -import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory; -import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory; -import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; -import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider; -import edu.uci.ics.hyracks.storage.common.file.LocalResource; - -public class SecondaryInvertedIndexOperationsHelper extends SecondaryIndexOperationsHelper { - - private IAType secondaryKeyType; - private ITypeTraits[] invListsTypeTraits; - private IBinaryComparatorFactory[] tokenComparatorFactories; - private ITypeTraits[] tokenTypeTraits; - private IBinaryTokenizerFactory tokenizerFactory; - // For tokenization, sorting and loading. Represents . - private int numTokenKeyPairFields; - private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories; - private RecordDescriptor tokenKeyPairRecDesc; - private boolean isPartitioned; - private int[] invertedIndexFields; - private int[] invertedIndexFieldsForNonBulkLoadOps; - private int[] secondaryFilterFieldsForNonBulkLoadOps; - - protected SecondaryInvertedIndexOperationsHelper(PhysicalOptimizationConfig physOptConf, - IAsterixPropertiesProvider propertiesProvider) { - super(physOptConf, propertiesProvider); - } - - @Override - @SuppressWarnings("rawtypes") - protected void setSecondaryRecDescAndComparators(IndexType indexType, List> secondaryKeyFields, - List secondaryKeyTypes, int gramLength, AqlMetadataProvider metadata) throws AlgebricksException, - AsterixException { - // Sanity checks. - if (numPrimaryKeys > 1) { - throw new AsterixException("Cannot create inverted index on dataset with composite primary key."); - } - if (numSecondaryKeys > 1) { - throw new AsterixException("Cannot create composite inverted index on multiple fields."); - } - if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX - || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) { - isPartitioned = true; - } else { - isPartitioned = false; - } - // Prepare record descriptor used in the assign op, and the optional - // select op. - secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys + numFilterFields]; - ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys - + numFilterFields]; - ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields]; - secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys]; - ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys]; - ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider(); - ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider(); - if (numSecondaryKeys > 0) { - secondaryFieldAccessEvalFactories[0] = FormatUtils.getDefaultFormat().getFieldAccessEvaluatorFactory( - isEnforcingKeyTypes ? enforcedItemType : itemType, secondaryKeyFields.get(0), numPrimaryKeys); - Pair keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), - secondaryKeyFields.get(0), itemType); - secondaryKeyType = keyTypePair.first; - anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second; - ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(secondaryKeyType); - secondaryRecFields[0] = keySerde; - secondaryTypeTraits[0] = typeTraitProvider.getTypeTrait(secondaryKeyType); - } - if (numFilterFields > 0) { - secondaryFieldAccessEvalFactories[numSecondaryKeys] = FormatUtils.getDefaultFormat() - .getFieldAccessEvaluatorFactory(itemType, filterFieldName, numPrimaryKeys); - Pair keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType); - IAType type = keyTypePair.first; - ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type); - secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde; - } - secondaryRecDesc = new RecordDescriptor(secondaryRecFields); - // Comparators and type traits for tokens. - int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1; - tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields]; - tokenTypeTraits = new ITypeTraits[numTokenFields]; - tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType); - tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType); - if (isPartitioned) { - // The partitioning field is hardcoded to be a short *without* an Asterix type tag. - tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY); - tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS; - } - // Set tokenizer factory. - // TODO: We might want to expose the hashing option at the AQL level, - // and add the choice to the index metadata. - tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), indexType, - gramLength); - // Type traits for inverted-list elements. Inverted lists contain - // primary keys. - invListsTypeTraits = new ITypeTraits[numPrimaryKeys]; - if (numPrimaryKeys > 0) { - invListsTypeTraits[0] = primaryRecDesc.getTypeTraits()[0]; - enforcedRecFields[0] = primaryRecDesc.getFields()[0]; - enforcedTypeTraits[0] = primaryRecDesc.getTypeTraits()[0]; - } - enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType); - enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits); - // For tokenization, sorting and loading. - // One token (+ optional partitioning field) + primary keys. - numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys; - ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields - + numFilterFields]; - ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields]; - tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields]; - tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(secondaryKeyType); - tokenKeyPairTypeTraits[0] = tokenTypeTraits[0]; - tokenKeyPairComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType); - int pkOff = 1; - if (isPartitioned) { - tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE; - tokenKeyPairTypeTraits[1] = tokenTypeTraits[1]; - tokenKeyPairComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY); - pkOff = 2; - } - if (numPrimaryKeys > 0) { - tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0]; - tokenKeyPairTypeTraits[pkOff] = primaryRecDesc.getTypeTraits()[0]; - tokenKeyPairComparatorFactories[pkOff] = primaryComparatorFactories[0]; - } - if (numFilterFields > 0) { - tokenKeyPairFields[numPrimaryKeys + pkOff] = secondaryRecFields[numPrimaryKeys + numSecondaryKeys]; - } - tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits); - if (filterFieldName != null) { - invertedIndexFields = new int[numTokenKeyPairFields]; - for (int i = 0; i < invertedIndexFields.length; i++) { - invertedIndexFields[i] = i; - } - secondaryFilterFieldsForNonBulkLoadOps = new int[numFilterFields]; - secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + numPrimaryKeys; - invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + numPrimaryKeys]; - for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; i++) { - invertedIndexFieldsForNonBulkLoadOps[i] = i; - } - } - - } - - @Override - protected int getNumSecondaryKeys() { - return numTokenKeyPairFields - numPrimaryKeys; - } - - @Override - public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - //prepare a LocalResourceMetadata which will be stored in NC's local resource repository - ILocalResourceMetadata localResourceMetadata = new LSMInvertedIndexLocalResourceMetadata(invListsTypeTraits, - primaryComparatorFactories, tokenTypeTraits, tokenComparatorFactories, tokenizerFactory, isPartitioned, - dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits, - filterCmpFactories, invertedIndexFields, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps, - invertedIndexFieldsForNonBulkLoadOps); - ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider( - localResourceMetadata, LocalResource.LSMInvertedIndexResource); - - IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory(); - LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories, - invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory, - localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp, - secondaryPartitionConstraint); - spec.addRoot(invIndexCreateOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - @Override - public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - // Create dummy key provider for feeding the primary index scan. - AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec); - - // Create primary index scan op. - BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec); - - AbstractOperatorDescriptor sourceOp = primaryScanOp; - if (isEnforcingKeyTypes) { - sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType()); - spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); - } - AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, numSecondaryKeys); - - // If any of the secondary fields are nullable, then add a select op - // that filters nulls. - AlgebricksMetaOperatorDescriptor selectOp = null; - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys); - } - - // Create a tokenizer op. - AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec); - - // Sort by token + primary keys. - ExternalSortOperatorDescriptor sortOp = createSortOp(spec, tokenKeyPairComparatorFactories, tokenKeyPairRecDesc); - - // Create secondary inverted index bulk load op. - LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec); - - AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, - new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {}); - // Connect the operators. - spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, tokenizerOp, 0); - } else { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, tokenizerOp, 0); - } - spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, invIndexBulkLoadOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), invIndexBulkLoadOp, 0, metaOp, 0); - spec.addRoot(metaOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - private AbstractOperatorDescriptor createTokenizerOp(JobSpecification spec) throws AlgebricksException { - int docField = 0; - int[] primaryKeyFields = new int[numPrimaryKeys + numFilterFields]; - for (int i = 0; i < primaryKeyFields.length; i++) { - primaryKeyFields[i] = numSecondaryKeys + i; - } - BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, - tokenKeyPairRecDesc, tokenizerFactory, docField, primaryKeyFields, isPartitioned, false); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp, - primaryPartitionConstraint); - return tokenizerOp; - } - - @Override - protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec, - IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) { - // Sort on token and primary keys. - int[] sortFields = new int[numTokenKeyPairFields]; - for (int i = 0; i < numTokenKeyPairFields; i++) { - sortFields[i] = i; - } - ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec, - physOptConf.getMaxFramesExternalSort(), sortFields, tokenKeyPairComparatorFactories, secondaryRecDesc); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint); - return sortOp; - } - - private LSMInvertedIndexBulkLoadOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec) { - int[] fieldPermutation = new int[numTokenKeyPairFields + numFilterFields]; - for (int i = 0; i < fieldPermutation.length; i++) { - fieldPermutation[i] = i; - } - IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory(); - LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor( - spec, secondaryRecDesc, fieldPermutation, false, numElementsHint, false, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories, - invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp, - secondaryPartitionConstraint); - return invIndexBulkLoadOp; - } - - private IIndexDataflowHelperFactory createDataflowHelperFactory() { - AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties(); - boolean temp = dataset.getDatasetDetails().isTemp(); - if (!isPartitioned) { - return new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider( - dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits, - filterCmpFactories, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps, - invertedIndexFieldsForNonBulkLoadOps, !temp); - } else { - return new PartitionedLSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider( - dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits, - filterCmpFactories, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps, - invertedIndexFieldsForNonBulkLoadOps, !temp); - } - } - - @Override - public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory(); - LSMInvertedIndexCompactOperator compactOp = new LSMInvertedIndexCompactOperator(spec, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories, - invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory, - NoOpOperationCallbackFactory.INSTANCE); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, - secondaryPartitionConstraint); - - spec.addRoot(compactOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } -}