asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [05/51] [partial] incubator-asterixdb git commit: Change folder structure for Java repackage
Date Tue, 25 Aug 2015 16:43:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
new file mode 100644
index 0000000..d91e820
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
@@ -0,0 +1,408 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.file;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.metadata.external.IndexingConstants;
+import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
+import edu.uci.ics.asterix.transaction.management.resource.ExternalRTreeLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import edu.uci.ics.hyracks.storage.common.file.LocalResource;
+
+@SuppressWarnings("rawtypes")
+public class SecondaryRTreeOperationsHelper extends SecondaryIndexOperationsHelper {
+
+    protected IPrimitiveValueProviderFactory[] valueProviderFactories;
+    protected int numNestedSecondaryKeyFields;
+    protected ATypeTag keyType;
+    protected int[] primaryKeyFields;
+    protected int[] rtreeFields;
+
+    protected SecondaryRTreeOperationsHelper(PhysicalOptimizationConfig physOptConf,
+            IAsterixPropertiesProvider propertiesProvider) {
+        super(physOptConf, propertiesProvider);
+    }
+
+    @Override
+    public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+
+        AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        IIndexDataflowHelperFactory indexDataflowHelperFactory;
+        ILocalResourceFactoryProvider localResourceFactoryProvider;
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+            ILocalResourceMetadata localResourceMetadata = new LSMRTreeLocalResourceMetadata(secondaryTypeTraits,
+                    secondaryComparatorFactories, primaryComparatorFactories, valueProviderFactories,
+                    RTreePolicyType.RTREE, AqlMetadataProvider.proposeLinearizer(keyType,
+                            secondaryComparatorFactories.length), dataset.getDatasetId(), mergePolicyFactory,
+                    mergePolicyFactoryProperties, filterTypeTraits, filterCmpFactories, rtreeFields, primaryKeyFields,
+                    secondaryFilterFields);
+            localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata,
+                    LocalResource.LSMRTreeResource);
+            indexDataflowHelperFactory = new LSMRTreeDataflowHelperFactory(valueProviderFactories,
+                    RTreePolicyType.RTREE, primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(
+                            dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
+                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
+                    AqlMetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
+                    storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, primaryKeyFields,
+                    filterTypeTraits, filterCmpFactories, secondaryFilterFields, !temp);
+        } else {
+            // External dataset
+            // Prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+            ILocalResourceMetadata localResourceMetadata = new ExternalRTreeLocalResourceMetadata(secondaryTypeTraits,
+                    secondaryComparatorFactories, ExternalIndexingOperations.getBuddyBtreeComparatorFactories(),
+                    valueProviderFactories, RTreePolicyType.RTREE, AqlMetadataProvider.proposeLinearizer(keyType,
+                            secondaryComparatorFactories.length), dataset.getDatasetId(), mergePolicyFactory,
+                    mergePolicyFactoryProperties, primaryKeyFields);
+            localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata,
+                    LocalResource.ExternalRTreeResource);
+
+            indexDataflowHelperFactory = new ExternalRTreeDataflowHelperFactory(valueProviderFactories,
+                    RTreePolicyType.RTREE, ExternalIndexingOperations.getBuddyBtreeComparatorFactories(),
+                    mergePolicyFactory, mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(
+                            dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
+                            secondaryComparatorFactories.length), storageProperties.getBloomFilterFalsePositiveRate(),
+                    new int[] { numNestedSecondaryKeyFields },
+                    ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
+        }
+
+        TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                secondaryFileSplitProvider, secondaryTypeTraits, secondaryComparatorFactories, null,
+                indexDataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(secondaryIndexCreateOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    protected int getNumSecondaryKeys() {
+        return numNestedSecondaryKeyFields;
+    }
+
+    @Override
+    protected void setSecondaryRecDescAndComparators(IndexType indexType, List<List<String>> secondaryKeyFields,
+            List<IAType> secondaryKeyTypes, int gramLength, AqlMetadataProvider metadata) throws AlgebricksException,
+            AsterixException {
+        int numSecondaryKeys = secondaryKeyFields.size();
+        if (numSecondaryKeys != 1) {
+            throw new AsterixException(
+                    "Cannot use "
+                            + numSecondaryKeys
+                            + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+        }
+        Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+                secondaryKeyFields.get(0), itemType);
+        IAType spatialType = spatialTypePair.first;
+        anySecondaryKeyIsNullable = spatialTypePair.second;
+        if (spatialType == null) {
+            throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+        }
+        int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        numNestedSecondaryKeyFields = numDimensions * 2;
+        int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? numPrimaryKeys : 0;
+        secondaryFieldAccessEvalFactories = metadata.getFormat().createMBRFactory(
+                isEnforcingKeyTypes ? enforcedItemType : itemType, secondaryKeyFields.get(0), recordColumn,
+                numDimensions, filterFieldName);
+        secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+        valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+                + numNestedSecondaryKeyFields + numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
+        secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        keyType = nestedKeyType.getTypeTag();
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(nestedKeyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    nestedKeyType, true);
+            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+
+        }
+        // Add serializers and comparators for primary index fields.
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+                secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
+                enforcedRecFields[i] = primaryRecDesc.getFields()[i];
+                enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i];
+            }
+        } else {
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                secondaryRecFields[numNestedSecondaryKeyFields + i] = IndexingConstants.getSerializerDeserializer(i);
+                secondaryTypeTraits[numNestedSecondaryKeyFields + i] = IndexingConstants.getTypeTraits(i);
+                enforcedRecFields[i] = IndexingConstants.getSerializerDeserializer(i);
+                enforcedTypeTraits[i] = IndexingConstants.getTypeTraits(i);
+            }
+        }
+        enforcedRecFields[numPrimaryKeys] = AqlSerializerDeserializerProvider.INSTANCE
+                .getSerializerDeserializer(itemType);
+        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
+        if (numFilterFields > 0) {
+            rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
+            for (int i = 0; i < rtreeFields.length; i++) {
+                rtreeFields[i] = i;
+            }
+
+            Pair<IAType, Boolean> typePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            IAType type = typePair.first;
+            ISerializerDeserializer serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type);
+            secondaryRecFields[numPrimaryKeys + numNestedSecondaryKeyFields] = serde;
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+        primaryKeyFields = new int[numPrimaryKeys];
+        for (int i = 0; i < primaryKeyFields.length; i++) {
+            primaryKeyFields[i] = i + numNestedSecondaryKeyFields;
+        }
+    }
+
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            // Create dummy key provider for feeding the primary index scan. 
+            AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+
+            // Create primary index scan op.
+            BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
+
+            // Assign op.
+            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            if (isEnforcingKeyTypes) {
+                sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+                spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
+            }
+            AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp,
+                    numNestedSecondaryKeyFields);
+
+            // If any of the secondary fields are nullable, then add a select op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeyFields);
+            }
+
+            // Sort by secondary keys.
+            ExternalSortOperatorDescriptor sortOp = createSortOp(spec,
+                    new IBinaryComparatorFactory[] { AqlMetadataProvider.proposeLinearizer(keyType,
+                            secondaryComparatorFactories.length) }, secondaryRecDesc);
+
+            AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+            // Create secondary RTree bulk load op.
+            TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
+                    spec,
+                    numNestedSecondaryKeyFields,
+                    new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+                            primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                            mergePolicyFactory, mergePolicyFactoryProperties,
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
+                                    secondaryComparatorFactories.length), storageProperties
+                                    .getBloomFilterFalsePositiveRate(), rtreeFields, primaryKeyFields,
+                            filterTypeTraits, filterCmpFactories, secondaryFilterFields, !temp),
+                    GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+            AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                    new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {});
+            // Connect the operators.
+            spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
+            } else {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
+            spec.addRoot(metaOp);
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        } else {
+            // External dataset
+            /*
+             * In case of external data, this method is used to build loading jobs for both initial load on index creation
+             * and transaction load on dataset referesh
+             */
+            // Create external indexing scan operator
+            ExternalDataScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec);
+            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            if (isEnforcingKeyTypes) {
+                sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
+                spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
+            }
+            // Assign op.
+            AlgebricksMetaOperatorDescriptor asterixAssignOp = createExternalAssignOp(spec, numNestedSecondaryKeyFields);
+
+            // If any of the secondary fields are nullable, then add a select op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
+            }
+
+            // Sort by secondary keys.
+            ExternalSortOperatorDescriptor sortOp = createSortOp(spec,
+                    new IBinaryComparatorFactory[] { AqlMetadataProvider.proposeLinearizer(keyType,
+                            secondaryComparatorFactories.length) }, secondaryRecDesc);
+            AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+
+            // Create the dataflow helper factory
+            ExternalRTreeDataflowHelperFactory dataflowHelperFactory = new ExternalRTreeDataflowHelperFactory(
+                    valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories, mergePolicyFactory,
+                    mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
+                    AqlMetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
+                    storageProperties.getBloomFilterFalsePositiveRate(), new int[] { numNestedSecondaryKeyFields },
+                    ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
+            // Create secondary RTree bulk load op.
+            IOperatorDescriptor root;
+            AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp;
+            if (externalFiles != null) {
+                // Transaction load
+                secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, numNestedSecondaryKeyFields,
+                        dataflowHelperFactory, GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+                root = secondaryBulkLoadOp;
+            } else {
+                // Initial load
+                secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, numNestedSecondaryKeyFields,
+                        dataflowHelperFactory, GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+                AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                        new IPushRuntimeFactory[] { new SinkRuntimeFactory() },
+                        new RecordDescriptor[] { secondaryRecDesc });
+                spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
+                root = metaOp;
+            }
+
+            spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
+            } else {
+                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+            spec.addRoot(root);
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        }
+        return spec;
+    }
+
+    @Override
+    public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+
+        AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        LSMTreeIndexCompactOperatorDescriptor compactOp;
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits,
+                    secondaryComparatorFactories, secondaryBloomFilterKeyFields, new LSMRTreeDataflowHelperFactory(
+                            valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
+                            new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
+                            mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(
+                                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
+                                    secondaryComparatorFactories.length),
+                            storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, primaryKeyFields,
+                            filterTypeTraits, filterCmpFactories, secondaryFilterFields, !temp),
+                    NoOpOperationCallbackFactory.INSTANCE);
+        } else {
+            // External dataset
+            compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits,
+                    secondaryComparatorFactories, secondaryBloomFilterKeyFields,
+                    new ExternalRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+                            primaryComparatorFactories, mergePolicyFactory, mergePolicyFactoryProperties,
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
+                                    secondaryComparatorFactories.length), storageProperties
+                                    .getBloomFilterFalsePositiveRate(), new int[] { numNestedSecondaryKeyFields },
+                            ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true),
+                    NoOpOperationCallbackFactory.INSTANCE);
+        }
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(compactOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
new file mode 100644
index 0000000..8e633a9
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
+import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
+import edu.uci.ics.asterix.file.ExternalIndexingOperations;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
+import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork.ClusterState;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
+
+    private static ClusterState state;
+    private static final Logger LOGGER = Logger.getLogger(AsterixGlobalRecoveryManager.class.getName());
+    private HyracksConnection hcc;
+    public static AsterixGlobalRecoveryManager INSTANCE;
+
+    public AsterixGlobalRecoveryManager(HyracksConnection hcc) throws Exception {
+        state = AsterixClusterProperties.INSTANCE.getState();
+        this.hcc = hcc;
+    }
+
+    @Override
+    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
+        state = AsterixClusterProperties.INSTANCE.getState();
+        AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(false);
+        return null;
+    }
+
+    @Override
+    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
+        // perform global recovery if state changed to active
+        final ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
+        boolean needToRecover = !newState.equals(state) && (newState == ClusterState.ACTIVE);
+        if (needToRecover) {
+            Thread recoveryThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    LOGGER.info("Starting AsterixDB's Global Recovery");
+                    MetadataTransactionContext mdTxnCtx = null;
+                    try {
+                        Thread.sleep(4000);
+                        MetadataManager.INSTANCE.init();
+                        // Loop over datasets
+                        mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                        List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+                        for (Dataverse dataverse : dataverses) {
+                            if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
+                                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse, CentralFeedManager.getInstance());
+                                List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+                                        dataverse.getDataverseName());
+                                for (Dataset dataset : datasets) {
+                                    if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+                                        // External dataset
+                                        // Get indexes
+                                        List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
+                                                dataset.getDataverseName(), dataset.getDatasetName());
+                                        if (indexes.size() > 0) {
+                                            // Get the state of the dataset
+                                            ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset
+                                                    .getDatasetDetails();
+                                            ExternalDatasetTransactionState datasetState = dsd.getState();
+                                            if (datasetState == ExternalDatasetTransactionState.BEGIN) {
+                                                List<ExternalFile> files = MetadataManager.INSTANCE
+                                                        .getDatasetExternalFiles(mdTxnCtx, dataset);
+                                                // if persumed abort, roll backward
+                                                // 1. delete all pending files
+                                                for (ExternalFile file : files) {
+                                                    if (file.getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
+                                                        MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
+                                                    }
+                                                }
+                                                // 2. clean artifacts in NCs
+                                                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                                                JobSpecification jobSpec = ExternalIndexingOperations.buildAbortOp(
+                                                        dataset, indexes, metadataProvider);
+                                                executeHyracksJob(jobSpec);
+                                                // 3. correct the dataset state
+                                                ((ExternalDatasetDetails) dataset.getDatasetDetails())
+                                                        .setState(ExternalDatasetTransactionState.COMMIT);
+                                                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
+                                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                                                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                                            } else if (datasetState == ExternalDatasetTransactionState.READY_TO_COMMIT) {
+                                                List<ExternalFile> files = MetadataManager.INSTANCE
+                                                        .getDatasetExternalFiles(mdTxnCtx, dataset);
+                                                // if ready to commit, roll forward
+                                                // 1. commit indexes in NCs
+                                                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                                                JobSpecification jobSpec = ExternalIndexingOperations.buildRecoverOp(
+                                                        dataset, indexes, metadataProvider);
+                                                executeHyracksJob(jobSpec);
+                                                // 2. add pending files in metadata
+                                                for (ExternalFile file : files) {
+                                                    if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP) {
+                                                        MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
+                                                        file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
+                                                        MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
+                                                    } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
+                                                        // find original file
+                                                        for (ExternalFile originalFile : files) {
+                                                            if (originalFile.getFileName().equals(file.getFileName())) {
+                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                        file);
+                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                        originalFile);
+                                                                break;
+                                                            }
+                                                        }
+                                                    } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
+                                                        // find original file
+                                                        for (ExternalFile originalFile : files) {
+                                                            if (originalFile.getFileName().equals(file.getFileName())) {
+                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                        file);
+                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                        originalFile);
+                                                                originalFile.setSize(file.getSize());
+                                                                MetadataManager.INSTANCE.addExternalFile(mdTxnCtx,
+                                                                        originalFile);
+                                                            }
+                                                        }
+                                                    }
+                                                }
+                                                // 3. correct the dataset state
+                                                ((ExternalDatasetDetails) dataset.getDatasetDetails())
+                                                        .setState(ExternalDatasetTransactionState.COMMIT);
+                                                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
+                                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                                                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    } catch (Exception e) {
+                        // This needs to be fixed <-- Needs to shutdown the system -->
+                        /*
+                         * Note: Throwing this illegal state exception will terminate this thread
+                         * and feeds listeners will not be notified.
+                         */
+                        LOGGER.severe("Global recovery was not completed successfully" + e);
+                        try {
+                            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                        } catch (Exception e1) {
+                            if (LOGGER.isLoggable(Level.SEVERE)) {
+                                LOGGER.severe("Exception in aborting" + e.getMessage());
+                            }
+                            throw new IllegalStateException(e1);
+                        }
+                    }
+                    AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(true);
+                    LOGGER.info("Global Recovery Completed");
+                }
+            });
+            state = newState;
+            recoveryThread.start();
+        }
+        return null;
+    }
+
+    private void executeHyracksJob(JobSpecification spec) throws Exception {
+        spec.setMaxReattempts(0);
+        JobId jobId = hcc.startJob(spec);
+        hcc.waitForCompletion(jobId);
+    }
+
+    @Override
+    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
+        // Do nothing
+    }
+
+    @Override
+    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
+        // Do nothing?
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixStateDumpHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixStateDumpHandler.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixStateDumpHandler.java
new file mode 100644
index 0000000..ac760bd
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixStateDumpHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import edu.uci.ics.hyracks.api.application.IStateDumpHandler;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
+
+public class AsterixStateDumpHandler implements IStateDumpHandler {
+    private final String nodeId;
+    private final Path dumpPath;
+    private final ILifeCycleComponentManager lccm;
+
+    public AsterixStateDumpHandler(String nodeId, String dumpPath, ILifeCycleComponentManager lccm) {
+        this.nodeId = nodeId;
+        this.dumpPath = Paths.get(dumpPath);
+        this.lccm = lccm;
+    }
+
+    @Override
+    public void dumpState(OutputStream os) throws IOException {
+        dumpPath.toFile().mkdirs();
+        File df = dumpPath.resolve(nodeId + "-" + System.currentTimeMillis() + ".dump").toFile();
+        try (FileOutputStream fos = new FileOutputStream(df)) {
+            lccm.dumpState(fos);
+        }
+        os.write(df.getAbsolutePath().getBytes("UTF-8"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
new file mode 100644
index 0000000..99d883c
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.component.AbstractLifeCycle;
+
+import edu.uci.ics.asterix.api.http.servlet.APIServlet;
+import edu.uci.ics.asterix.api.http.servlet.AQLAPIServlet;
+import edu.uci.ics.asterix.api.http.servlet.ConnectorAPIServlet;
+import edu.uci.ics.asterix.api.http.servlet.DDLAPIServlet;
+import edu.uci.ics.asterix.api.http.servlet.FeedServlet;
+import edu.uci.ics.asterix.api.http.servlet.QueryAPIServlet;
+import edu.uci.ics.asterix.api.http.servlet.QueryResultAPIServlet;
+import edu.uci.ics.asterix.api.http.servlet.QueryStatusAPIServlet;
+import edu.uci.ics.asterix.api.http.servlet.ShutdownAPIServlet;
+import edu.uci.ics.asterix.api.http.servlet.UpdateAPIServlet;
+import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
+import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
+import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
+import edu.uci.ics.asterix.common.feeds.api.ICentralFeedManager;
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
+import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
+import edu.uci.ics.asterix.metadata.bootstrap.AsterixStateProxy;
+import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.application.ICCApplicationEntryPoint;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
+
+public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
+    private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName());
+
+    private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
+
+    private Server webServer;
+    private Server jsonAPIServer;
+    private Server feedServer;
+    private ICentralFeedManager centralFeedManager;
+
+    private static IAsterixStateProxy proxy;
+    private ICCApplicationContext appCtx;
+
+    @Override
+    public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
+        this.appCtx = ccAppCtx;
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Starting Asterix cluster controller");
+        }
+
+        appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager()));
+        AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection());
+
+        proxy = AsterixStateProxy.registerRemoteObject();
+        appCtx.setDistributedState(proxy);
+
+        AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.getInstance().getMetadataProperties();
+        MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
+
+        AsterixAppContextInfo.getInstance().getCCApplicationContext()
+        .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
+
+        AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
+        setupWebServer(externalProperties);
+        webServer.start();
+
+        setupJSONAPIServer(externalProperties);
+        jsonAPIServer.start();
+        ExternalLibraryBootstrap.setUpExternaLibraries(false);
+
+        setupFeedServer(externalProperties);
+        feedServer.start();
+        centralFeedManager = CentralFeedManager.getInstance(); 
+        centralFeedManager.start();
+
+        waitUntilServerStart(webServer);
+        waitUntilServerStart(jsonAPIServer);
+        waitUntilServerStart(feedServer);
+
+        AsterixGlobalRecoveryManager.INSTANCE = new AsterixGlobalRecoveryManager(
+                (HyracksConnection) getNewHyracksClientConnection());
+        ClusterManager.INSTANCE.registerSubscriber(AsterixGlobalRecoveryManager.INSTANCE);
+
+        ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
+    }
+
+    private void waitUntilServerStart(AbstractLifeCycle webServer) throws Exception {
+        while (!webServer.isStarted()) {
+            if (webServer.isFailed()) {
+                throw new Exception("Server failed to start");
+            }
+            wait(1000);
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Stopping Asterix cluster controller");
+        }
+        AsterixStateProxy.unregisterRemoteObject();
+
+        webServer.stop();
+        jsonAPIServer.stop();
+        feedServer.stop();
+    }
+
+    private IHyracksClientConnection getNewHyracksClientConnection() throws Exception {
+        String strIP = appCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
+        int port = appCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
+        return new HyracksConnection(strIP, port);
+    }
+
+    private void setupWebServer(AsterixExternalProperties externalProperties) throws Exception {
+
+        webServer = new Server(externalProperties.getWebInterfacePort());
+
+        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+        context.setContextPath("/");
+
+        IHyracksClientConnection hcc = getNewHyracksClientConnection();
+        context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
+
+        webServer.setHandler(context);
+        context.addServlet(new ServletHolder(new APIServlet()), "/*");
+    }
+
+    private void setupJSONAPIServer(AsterixExternalProperties externalProperties) throws Exception {
+        jsonAPIServer = new Server(externalProperties.getAPIServerPort());
+
+        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+        context.setContextPath("/");
+
+        IHyracksClientConnection hcc = getNewHyracksClientConnection();
+        context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
+
+        jsonAPIServer.setHandler(context);
+        context.addServlet(new ServletHolder(new QueryAPIServlet()), "/query");
+        context.addServlet(new ServletHolder(new QueryStatusAPIServlet()), "/query/status");
+        context.addServlet(new ServletHolder(new QueryResultAPIServlet()), "/query/result");
+        context.addServlet(new ServletHolder(new UpdateAPIServlet()), "/update");
+        context.addServlet(new ServletHolder(new DDLAPIServlet()), "/ddl");
+        context.addServlet(new ServletHolder(new AQLAPIServlet()), "/aql");
+        context.addServlet(new ServletHolder(new ConnectorAPIServlet()), "/connector");
+        context.addServlet(new ServletHolder(new ShutdownAPIServlet()), "/admin/shutdown");
+    }
+
+    private void setupFeedServer(AsterixExternalProperties externalProperties) throws Exception {
+        feedServer = new Server(externalProperties.getFeedServerPort());
+
+        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+        context.setContextPath("/");
+
+        IHyracksClientConnection hcc = getNewHyracksClientConnection();
+        context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
+
+        feedServer.setHandler(context);
+        context.addServlet(new ServletHolder(new FeedServlet()), "/");
+   
+        // add paths here
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
new file mode 100644
index 0000000..c7821b2
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse.Status;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.event.schema.cluster.Node;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
+import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
+import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWorkResponse;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.hyracks.api.application.IClusterLifecycleListener;
+
+public class ClusterLifecycleListener implements IClusterLifecycleListener {
+
+    private static final Logger LOGGER = Logger.getLogger(ClusterLifecycleListener.class.getName());
+
+    private static final LinkedBlockingQueue<Set<IClusterManagementWork>> workRequestQueue = new LinkedBlockingQueue<Set<IClusterManagementWork>>();
+
+    private static ClusterWorkExecutor eventHandler = new ClusterWorkExecutor(workRequestQueue);
+
+    private static List<IClusterManagementWorkResponse> pendingWorkResponses = new ArrayList<IClusterManagementWorkResponse>();
+
+    public static ClusterLifecycleListener INSTANCE = new ClusterLifecycleListener();
+
+    private ClusterLifecycleListener() {
+        Thread t = new Thread(eventHandler);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Starting cluster event handler");
+        }
+        t.start();
+    }
+
+    public enum ClusterEventType {
+        NODE_JOIN,
+        NODE_FAILURE
+    }
+
+    @Override
+    public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("NC: " + nodeId + " joined");
+        }
+        AsterixClusterProperties.INSTANCE.addNCConfiguration(nodeId, ncConfiguration);
+        Set<String> nodeAddition = new HashSet<String>();
+        nodeAddition.add(nodeId);
+        updateProgress(ClusterEventType.NODE_JOIN, nodeAddition);
+        Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();
+        Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
+        for (IClusterEventsSubscriber sub : subscribers) {
+            Set<IClusterManagementWork> workRequest = sub.notifyNodeJoin(nodeId);
+            if (workRequest != null && !workRequest.isEmpty()) {
+                work.addAll(workRequest);
+            }
+        }
+        if (!work.isEmpty()) {
+            executeWorkSet(work);
+        }
+
+    }
+
+    public void notifyNodeFailure(Set<String> deadNodeIds) {
+        for (String deadNode : deadNodeIds) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("NC: " + deadNode + " left");
+            }
+            AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
+        }
+        updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds);
+        Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();
+        Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
+        for (IClusterEventsSubscriber sub : subscribers) {
+            Set<IClusterManagementWork> workRequest = sub.notifyNodeFailure(deadNodeIds);
+            if (workRequest != null && !workRequest.isEmpty()) {
+                work.addAll(workRequest);
+            }
+        }
+        if (!work.isEmpty()) {
+            executeWorkSet(work);
+        }
+    }
+
+    private void updateProgress(ClusterEventType eventType, Set<String> nodeIds) {
+        List<IClusterManagementWorkResponse> completedResponses = new ArrayList<IClusterManagementWorkResponse>();
+        boolean isComplete = false;
+        for (IClusterManagementWorkResponse resp : pendingWorkResponses) {
+            switch (eventType) {
+                case NODE_FAILURE:
+                    isComplete = ((RemoveNodeWorkResponse) resp).updateProgress(nodeIds);
+                    if (isComplete) {
+                        resp.setStatus(Status.SUCCESS);
+                        resp.getWork().getSourceSubscriber().notifyRequestCompletion(resp);
+                        completedResponses.add(resp);
+                    }
+                    break;
+
+                case NODE_JOIN:
+                    isComplete = ((AddNodeWorkResponse) resp).updateProgress(nodeIds.iterator().next());
+                    if (isComplete) {
+                        resp.setStatus(Status.SUCCESS);
+                        resp.getWork().getSourceSubscriber().notifyRequestCompletion(resp);
+                        completedResponses.add(resp);
+                    }
+                    break;
+            }
+        }
+        pendingWorkResponses.removeAll(completedResponses);
+    }
+
+    private void executeWorkSet(Set<IClusterManagementWork> workSet) {
+        int nodesToAdd = 0;
+        Set<String> nodesToRemove = new HashSet<String>();
+        Set<AddNodeWork> nodeAdditionRequests = new HashSet<AddNodeWork>();
+        Set<IClusterManagementWork> nodeRemovalRequests = new HashSet<IClusterManagementWork>();
+        for (IClusterManagementWork w : workSet) {
+            switch (w.getClusterManagementWorkType()) {
+                case ADD_NODE:
+                    if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodesRequested()) {
+                        nodesToAdd = ((AddNodeWork) w).getNumberOfNodesRequested();
+                    }
+                    nodeAdditionRequests.add((AddNodeWork) w);
+                    break;
+                case REMOVE_NODE:
+                    nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
+                    nodeRemovalRequests.add(w);
+                    RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w, Status.IN_PROGRESS);
+                    pendingWorkResponses.add(response);
+                    break;
+            }
+        }
+
+        List<String> addedNodes = new ArrayList<String>();
+        String asterixInstanceName = AsterixClusterProperties.INSTANCE.getCluster().getInstanceName();
+        for (int i = 0; i < nodesToAdd; i++) {
+            Node node = AsterixClusterProperties.INSTANCE.getAvailableSubstitutionNode();
+            if (node != null) {
+                try {
+                    ClusterManager.INSTANCE.addNode(node);
+                    addedNodes.add(asterixInstanceName + "_" + node.getId());
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Added NC at:" + node.getId());
+                    }
+                } catch (AsterixException e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Unable to add NC at:" + node.getId());
+                    }
+                    e.printStackTrace();
+                }
+            } else {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Unable to add NC: no more available nodes");
+                }
+
+            }
+        }
+
+        for (AddNodeWork w : nodeAdditionRequests) {
+            int n = w.getNumberOfNodesRequested();
+            List<String> nodesToBeAddedForWork = new ArrayList<String>();
+            for (int i = 0; i < n && i < addedNodes.size(); i++) {
+                nodesToBeAddedForWork.add(addedNodes.get(i));
+            }
+            if (nodesToBeAddedForWork.isEmpty()) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Unable to satisfy request by " + w);
+                }
+                AddNodeWorkResponse response = new AddNodeWorkResponse(w, nodesToBeAddedForWork);
+                response.setStatus(Status.FAILURE);
+                w.getSourceSubscriber().notifyRequestCompletion(response);
+
+            } else {
+                AddNodeWorkResponse response = new AddNodeWorkResponse(w, nodesToBeAddedForWork);
+                pendingWorkResponses.add(response);
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
new file mode 100644
index 0000000..a4faea7
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.event.schema.cluster.Node;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
+import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+
+public class ClusterWorkExecutor implements Runnable {
+
+    private static final Logger LOGGER = Logger.getLogger(ClusterWorkExecutor.class.getName());
+
+    private final LinkedBlockingQueue<Set<IClusterManagementWork>> inbox;
+
+    public ClusterWorkExecutor(LinkedBlockingQueue<Set<IClusterManagementWork>> inbox) {
+        this.inbox = inbox;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                Set<IClusterManagementWork> workSet = inbox.take();
+                int nodesToAdd = 0;
+                Set<String> nodesToRemove = new HashSet<String>();
+                Set<IClusterManagementWork> nodeAdditionRequests = new HashSet<IClusterManagementWork>();
+                Set<IClusterManagementWork> nodeRemovalRequests = new HashSet<IClusterManagementWork>();
+                for (IClusterManagementWork w : workSet) {
+                    switch (w.getClusterManagementWorkType()) {
+                        case ADD_NODE:
+                            if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodesRequested()) {
+                                nodesToAdd = ((AddNodeWork) w).getNumberOfNodesRequested();
+                            }
+                            nodeAdditionRequests.add(w);
+                            break;
+                        case REMOVE_NODE:
+                            nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
+                            nodeRemovalRequests.add(w);
+                            break;
+                    }
+                }
+
+                Set<Node> addedNodes = new HashSet<Node>();
+                for (int i = 0; i < nodesToAdd; i++) {
+                    Node node = AsterixClusterProperties.INSTANCE.getAvailableSubstitutionNode();
+                    if (node != null) {
+                        try {
+                            ClusterManager.INSTANCE.addNode(node);
+                            addedNodes.add(node);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Added NC at:" + node.getId());
+                            }
+                        } catch (AsterixException e) {
+                            if (LOGGER.isLoggable(Level.WARNING)) {
+                                LOGGER.warning("Unable to add NC at:" + node.getId());
+                            }
+                            e.printStackTrace();
+                        }
+                    } else {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Unable to add NC: no more available nodes");
+                        }
+                    }
+                }
+
+            } catch (InterruptedException e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe("interruped" + e.getMessage());
+                }
+                throw new IllegalStateException(e);
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe("Unexpected exception in handling cluster event" + e.getMessage());
+                }
+            }
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
new file mode 100755
index 0000000..31aca3d
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
@@ -0,0 +1,318 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.external.library.ExternalLibrary;
+import edu.uci.ics.asterix.external.library.LibraryAdapter;
+import edu.uci.ics.asterix.external.library.LibraryFunction;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
+import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
+import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
+
+public class ExternalLibraryBootstrap {
+
+    private static Logger LOGGER = Logger.getLogger(ExternalLibraryBootstrap.class.getName());
+
+    public static void setUpExternaLibraries(boolean isMetadataNode) throws Exception {
+
+        Map<String, List<String>> uninstalledLibs = null;
+        if (isMetadataNode) {
+            uninstalledLibs = uninstallLibraries();
+        }
+
+        File installLibDir = getLibraryInstallDir();
+        if (installLibDir.exists()) {
+            for (String dataverse : installLibDir.list()) {
+                File dataverseDir = new File(installLibDir, dataverse);
+                String[] libraries = dataverseDir.list();
+                for (String library : libraries) {
+                    registerLibrary(dataverse, library, isMetadataNode, installLibDir);
+                    if (isMetadataNode) {
+                        File libraryDir = new File(installLibDir.getAbsolutePath() + File.separator + dataverse
+                                + File.separator + library);
+                        installLibraryIfNeeded(dataverse, libraryDir, uninstalledLibs);
+                    }
+                }
+            }
+        }
+    }
+
+    private static Map<String, List<String>> uninstallLibraries() throws Exception {
+        Map<String, List<String>> uninstalledLibs = new HashMap<String, List<String>>();
+        File uninstallLibDir = getLibraryUninstallDir();
+        String[] uninstallLibNames;
+        if (uninstallLibDir.exists()) {
+            uninstallLibNames = uninstallLibDir.list();
+            for (String uninstallLibName : uninstallLibNames) {
+                String[] components = uninstallLibName.split("\\.");
+                String dataverse = components[0];
+                String libName = components[1];
+                uninstallLibrary(dataverse, libName);
+                new File(uninstallLibDir, uninstallLibName).delete();
+                List<String> uinstalledLibsInDv = uninstalledLibs.get(dataverse);
+                if (uinstalledLibsInDv == null) {
+                    uinstalledLibsInDv = new ArrayList<String>();
+                    uninstalledLibs.put(dataverse, uinstalledLibsInDv);
+                }
+                uinstalledLibsInDv.add(libName);
+            }
+        }
+        return uninstalledLibs;
+    }
+
+    private static boolean uninstallLibrary(String dataverse, String libraryName) throws AsterixException,
+            RemoteException, ACIDException {
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
+            if (dv == null) {
+                return false;
+            }
+
+            edu.uci.ics.asterix.metadata.entities.Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx,
+                    dataverse, libraryName);
+            if (library == null) {
+                return false;
+            }
+
+            List<edu.uci.ics.asterix.metadata.entities.Function> functions = MetadataManager.INSTANCE
+                    .getDataverseFunctions(mdTxnCtx, dataverse);
+            for (edu.uci.ics.asterix.metadata.entities.Function function : functions) {
+                if (function.getName().startsWith(libraryName + "#")) {
+                    MetadataManager.INSTANCE.dropFunction(mdTxnCtx, new FunctionSignature(dataverse,
+                            function.getName(), function.getArity()));
+                }
+            }
+
+            List<edu.uci.ics.asterix.metadata.entities.DatasourceAdapter> adapters = MetadataManager.INSTANCE
+                    .getDataverseAdapters(mdTxnCtx, dataverse);
+            for (edu.uci.ics.asterix.metadata.entities.DatasourceAdapter adapter : adapters) {
+                if (adapter.getAdapterIdentifier().getName().startsWith(libraryName + "#")) {
+                    MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverse, adapter.getAdapterIdentifier().getName());
+                }
+            }
+
+            MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverse, libraryName);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            throw new AsterixException(e);
+        }
+        return true;
+    }
+
+    // Each element of a library is installed as part of a transaction. Any
+    // failure in installing an element does not effect installation of other
+    // libraries
+    private static void installLibraryIfNeeded(String dataverse, final File libraryDir,
+            Map<String, List<String>> uninstalledLibs) throws Exception {
+
+        String libraryName = libraryDir.getName().trim();
+        List<String> uninstalledLibsInDv = uninstalledLibs.get(dataverse);
+        boolean wasUninstalled = uninstalledLibsInDv != null && uninstalledLibsInDv.contains(libraryName);
+
+        MetadataTransactionContext mdTxnCtx = null;
+        MetadataManager.INSTANCE.acquireWriteLatch();
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            edu.uci.ics.asterix.metadata.entities.Library libraryInMetadata = MetadataManager.INSTANCE.getLibrary(
+                    mdTxnCtx, dataverse, libraryName);
+            if (libraryInMetadata != null && !wasUninstalled) {
+                return;
+            }
+
+            String[] libraryDescriptors = libraryDir.list(new FilenameFilter() {
+                @Override
+                public boolean accept(File dir, String name) {
+                    return name.endsWith(".xml");
+                }
+            });
+
+            ExternalLibrary library = getLibrary(new File(libraryDir + File.separator + libraryDescriptors[0]));
+
+            if (libraryDescriptors.length == 0) {
+                throw new Exception("No library descriptor defined");
+            } else if (libraryDescriptors.length > 1) {
+                throw new Exception("More than 1 library descriptors defined");
+            }
+
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
+            if (dv == null) {
+                MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverse,
+                        NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT, IMetadataEntity.PENDING_NO_OP));
+            }
+            if (library.getLibraryFunctions() != null) {
+                for (LibraryFunction function : library.getLibraryFunctions().getLibraryFunction()) {
+                    String[] fargs = function.getArguments().trim().split(",");
+                    List<String> args = new ArrayList<String>();
+                    for (String arg : fargs) {
+                        args.add(arg);
+                    }
+                    edu.uci.ics.asterix.metadata.entities.Function f = new edu.uci.ics.asterix.metadata.entities.Function(
+                            dataverse, libraryName + "#" + function.getName().trim(), args.size(), args, function
+                                    .getReturnType().trim(), function.getDefinition().trim(), library.getLanguage()
+                                    .trim(), function.getFunctionType().trim());
+                    MetadataManager.INSTANCE.addFunction(mdTxnCtx, f);
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Installed function: " + libraryName + "#" + function.getName().trim());
+                    }
+                }
+            }
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Installed functions contain in library :" + libraryName);
+            }
+
+            if (library.getLibraryAdapters() != null) {
+                for (LibraryAdapter adapter : library.getLibraryAdapters().getLibraryAdapter()) {
+                    String adapterFactoryClass = adapter.getFactoryClass().trim();
+                    String adapterName = libraryName + "#" + adapter.getName().trim();
+                    AdapterIdentifier aid = new AdapterIdentifier(dataverse, adapterName);
+                    DatasourceAdapter dsa = new DatasourceAdapter(aid, adapterFactoryClass, AdapterType.EXTERNAL);
+                    MetadataManager.INSTANCE.addAdapter(mdTxnCtx, dsa);
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Installed adapter: " + adapterName);
+                    }
+                }
+            }
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Installed adapters contain in library :" + libraryName);
+            }
+
+            MetadataManager.INSTANCE.addLibrary(mdTxnCtx, new edu.uci.ics.asterix.metadata.entities.Library(dataverse,
+                    libraryName));
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Added library " + libraryName + "to Metadata");
+            }
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.info("Exception in installing library " + libraryName);
+            }
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+        } finally {
+            MetadataManager.INSTANCE.releaseWriteLatch();
+        }
+    }
+
+    private static void registerLibrary(String dataverse, String libraryName, boolean isMetadataNode, File installLibDir)
+            throws Exception {
+        ClassLoader classLoader = getLibraryClassLoader(dataverse, libraryName);
+        ExternalLibraryManager.registerLibraryClassLoader(dataverse, libraryName, classLoader);
+    }
+
+    private static ExternalLibrary getLibrary(File libraryXMLPath) throws Exception {
+        JAXBContext configCtx = JAXBContext.newInstance(ExternalLibrary.class);
+        Unmarshaller unmarshaller = configCtx.createUnmarshaller();
+        ExternalLibrary library = (ExternalLibrary) unmarshaller.unmarshal(libraryXMLPath);
+        return library;
+    }
+
+    private static ClassLoader getLibraryClassLoader(String dataverse, String libraryName) throws Exception {
+
+        File installDir = getLibraryInstallDir();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Installing lirbary " + libraryName + " in dataverse " + dataverse + "."
+                    + " Install Directory: " + installDir.getAbsolutePath());
+        }
+
+        File libDir = new File(installDir.getAbsolutePath() + File.separator + dataverse + File.separator + libraryName);
+        FilenameFilter jarFileFilter = new FilenameFilter() {
+            public boolean accept(File dir, String name) {
+                return name.endsWith(".jar");
+            }
+        };
+
+        String[] jarsInLibDir = libDir.list(jarFileFilter);
+        if (jarsInLibDir.length > 1) {
+            throw new Exception("Incorrect library structure: found multiple library jars");
+        }
+        if (jarsInLibDir.length < 0) {
+            throw new Exception("Incorrect library structure: could not find library jar");
+        }
+
+        File libJar = new File(libDir, jarsInLibDir[0]);
+        File libDependencyDir = new File(libDir.getAbsolutePath() + File.separator + "lib");
+        int numDependencies = 1;
+        String[] libraryDependencies = null;
+        if (libDependencyDir.exists()) {
+            libraryDependencies = libDependencyDir.list(jarFileFilter);
+            numDependencies += libraryDependencies.length;
+        }
+
+        ClassLoader parentClassLoader = ExternalLibraryBootstrap.class.getClassLoader();
+        URL[] urls = new URL[numDependencies];
+        int count = 0;
+        urls[count++] = libJar.toURL();
+
+        if (libraryDependencies != null && libraryDependencies.length > 0) {
+            for (String dependency : libraryDependencies) {
+                File file = new File(libDependencyDir + File.separator + dependency);
+                urls[count++] = file.toURL();
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            StringBuilder logMesg = new StringBuilder("Classpath for library " + libraryName + "\n");
+            for (URL url : urls) {
+                logMesg.append(url.getFile() + "\n");
+            }
+            LOGGER.info(logMesg.toString());
+        }
+
+        ClassLoader classLoader = new URLClassLoader(urls, parentClassLoader);
+        return classLoader;
+    }
+
+    private static File getLibraryInstallDir() {
+        String workingDir = System.getProperty("user.dir");
+        return new File(workingDir + File.separator + "library");
+    }
+
+    private static File getLibraryUninstallDir() {
+        String workingDir = System.getProperty("user.dir");
+        return new File(workingDir + File.separator + "uninstall");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
new file mode 100644
index 0000000..7d22091
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+
+public class FeedBootstrap {
+
+    public final static String FEEDS_METADATA_DV = "feeds_metadata";
+    public final static String FAILED_TUPLE_DATASET = "failed_tuple";
+    public final static String FAILED_TUPLE_DATASET_TYPE = "FailedTupleType";
+    public final static String FAILED_TUPLE_DATASET_KEY = "id";
+
+    public static void setUpInitialArtifacts() throws Exception {
+
+        StringBuilder builder = new StringBuilder();
+        try {
+            builder.append("create dataverse " + FEEDS_METADATA_DV + ";" + "\n");
+            builder.append("use dataverse " + FEEDS_METADATA_DV + ";" + "\n");
+
+            builder.append("create type " + FAILED_TUPLE_DATASET_TYPE + " as open { ");
+
+            String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple",
+                    "message", "timestamp" };
+            IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+                    BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
+
+            for (int i = 0; i < fieldNames.length; i++) {
+                if (i > 0) {
+                    builder.append(",");
+                }
+                builder.append(fieldNames[i] + ":");
+                builder.append(fieldTypes[i].getTypeName());
+            }
+            builder.append("}" + ";" + "\n");
+
+            builder.append("create dataset " + FAILED_TUPLE_DATASET + " " + "(" + FAILED_TUPLE_DATASET_TYPE + ")" + " "
+                    + "primary key " + FAILED_TUPLE_DATASET_KEY + " on  " + MetadataConstants.METADATA_NODEGROUP_NAME
+                    + ";");
+
+            CentralFeedManager.AQLExecutor.executeAQL(builder.toString());
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println("Error: " + builder.toString());
+            throw e;
+        }
+    }
+
+}


Mime
View raw message