asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [1/9] asterixdb git commit: Feed Connection Refactoring
Date Sun, 19 Feb 2017 07:14:47 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 692b8a890 -> fff200ca8


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 21db749..385f2bd 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -19,34 +19,21 @@
 package org.apache.asterix.metadata.feeds;
 
 import java.rmi.RemoteException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.external.feed.api.IFeed;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
-import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
 import org.apache.asterix.external.provider.AdapterFactoryProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
@@ -56,34 +43,14 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
-import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import org.apache.hyracks.api.constraints.Constraint;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
-import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
-import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
 
 /**
  * A utility class for providing helper functions for feeds
@@ -91,8 +58,6 @@ import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageCon
  */
 public class FeedMetadataUtil {
 
-    private static final Logger LOGGER = Logger.getLogger(FeedMetadataUtil.class.getName());
-
     public static Dataset validateIfDatasetExists(String dataverse, String datasetName, MetadataTransactionContext
ctx)
             throws CompilationException {
         Dataset dataset = MetadataManager.INSTANCE.getDataset(ctx, dataverse, datasetName);
@@ -130,193 +95,6 @@ public class FeedMetadataUtil {
         return feedPolicy;
     }
 
-    public static JobSpecification alterJobSpecificationForFeed(JobSpecification spec,
-            FeedConnectionId feedConnectionId, Map<String, String> feedPolicyProperties)
{
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Original Job Spec:" + spec);
-        }
-
-        JobSpecification altered = new JobSpecification(spec.getFrameSize());
-        Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
-        boolean preProcessingRequired = preProcessingRequired(feedConnectionId);
-        // copy operators
-        Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<>();
-        FeedMetaOperatorDescriptor metaOp;
-        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet())
{
-            String operandId = null;
-            IOperatorDescriptor opDesc = entry.getValue();
-            if (opDesc instanceof FeedCollectOperatorDescriptor) {
-                FeedCollectOperatorDescriptor orig = (FeedCollectOperatorDescriptor) opDesc;
-                FeedCollectOperatorDescriptor fiop = new FeedCollectOperatorDescriptor(altered,
-                        orig.getFeedConnectionId(), orig.getSourceFeedId(), (ARecordType)
orig.getOutputType(),
-                        orig.getRecordDescriptor(), orig.getFeedPolicyProperties(), orig.getSubscriptionLocation());
-                oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
-            } else if ((opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor)
-                    && ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary())
{
-                // only introduce store before primary index
-                operandId = ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
-                metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
feedPolicyProperties,
-                        FeedRuntimeType.STORE, false, operandId);
-                oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
-            } else {
-                FeedRuntimeType runtimeType;
-                boolean enableSubscriptionMode;
-                OperatorDescriptorId opId = null;
-                if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
-                    IPushRuntimeFactory[] runtimeFactories = ((AlgebricksMetaOperatorDescriptor)
opDesc).getPipeline()
-                            .getRuntimeFactories();
-                    if (runtimeFactories[0] instanceof AssignRuntimeFactory && runtimeFactories.length
> 1) {
-                        IConnectorDescriptor connectorDesc = spec.getOperatorInputMap().get(opDesc.getOperatorId())
-                                .get(0);
-                        IOperatorDescriptor sourceOp = spec.getProducer(connectorDesc);
-                        if (sourceOp instanceof FeedCollectOperatorDescriptor) {
-                            runtimeType = FeedRuntimeType.COMPUTE;
-                            enableSubscriptionMode = preProcessingRequired;
-                            metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId,
opDesc,
-                                    feedPolicyProperties, runtimeType, enableSubscriptionMode,
operandId);
-                            opId = metaOp.getOperatorId();
-                        }
-                    }
-                }
-                if (opId == null) {
-                    opId = altered.createOperatorDescriptorId(opDesc);
-                }
-                oldNewOID.put(opDesc.getOperatorId(), opId);
-            }
-        }
-
-        // copy connectors
-        Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = new HashMap<>();
-        for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet())
{
-            IConnectorDescriptor connDesc = entry.getValue();
-            ConnectorDescriptorId newConnId;
-            if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
-                MToNPartitioningConnectorDescriptor m2nConn = (MToNPartitioningConnectorDescriptor)
connDesc;
-                connDesc = new MToNPartitioningWithMessageConnectorDescriptor(altered,
-                        m2nConn.getTuplePartitionComputerFactory());
-                newConnId = connDesc.getConnectorId();
-            } else {
-                newConnId = altered.createConnectorDescriptor(connDesc);
-            }
-            connectorMapping.put(entry.getKey(), newConnId);
-        }
-
-        // make connections between operators
-        for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>,
-                Pair<IOperatorDescriptor, Integer>>> entry : spec.getConnectorOperatorMap().entrySet())
{
-            IConnectorDescriptor connDesc = altered.getConnectorMap().get(connectorMapping.get(entry.getKey()));
-            Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
-            Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
-
-            IOperatorDescriptor leftOpDesc = altered.getOperatorMap()
-                    .get(oldNewOID.get(leftOp.getLeft().getOperatorId()));
-            IOperatorDescriptor rightOpDesc = altered.getOperatorMap()
-                    .get(oldNewOID.get(rightOp.getLeft().getOperatorId()));
-
-            altered.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
-        }
-
-        // prepare for setting partition constraints
-        Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations
= new HashMap<>();
-        Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
-
-        for (Constraint constraint : spec.getUserConstraints()) {
-            LValueConstraintExpression lexpr = constraint.getLValue();
-            ConstraintExpression cexpr = constraint.getRValue();
-            OperatorDescriptorId opId;
-            switch (lexpr.getTag()) {
-                case PARTITION_COUNT:
-                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
-                    operatorCounts.put(opId, (int) ((ConstantExpression) cexpr).getValue());
-                    break;
-                case PARTITION_LOCATION:
-                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
-
-                    IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(opId));
-                    List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId());
-                    if (locations == null) {
-                        locations = new ArrayList<>();
-                        operatorLocations.put(opDesc.getOperatorId(), locations);
-                    }
-                    String location = (String) ((ConstantExpression) cexpr).getValue();
-                    LocationConstraint lc = new LocationConstraint(location,
-                            ((PartitionLocationExpression) lexpr).getPartition());
-                    locations.add(lc);
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        // set absolute location constraints
-        for (Entry<OperatorDescriptorId, List<LocationConstraint>> entry : operatorLocations.entrySet())
{
-            IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
-            Collections.sort(entry.getValue(), (LocationConstraint o1, LocationConstraint
o2) -> {
-                return o1.partition - o2.partition;
-            });
-            String[] locations = new String[entry.getValue().size()];
-            for (int i = 0; i < locations.length; ++i) {
-                locations[i] = entry.getValue().get(i).location;
-            }
-            PartitionConstraintHelper.addAbsoluteLocationConstraint(altered, opDesc, locations);
-        }
-
-        // set count constraints
-        for (Entry<OperatorDescriptorId, Integer> entry : operatorCounts.entrySet())
{
-            IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
-            if (!operatorLocations.keySet().contains(entry.getKey())) {
-                PartitionConstraintHelper.addPartitionCountConstraint(altered, opDesc, entry.getValue());
-            }
-        }
-
-        // useConnectorSchedulingPolicy
-        altered.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
-
-        // connectorAssignmentPolicy
-        altered.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
-
-        // roots
-        for (OperatorDescriptorId root : spec.getRoots()) {
-            altered.addRoot(altered.getOperatorMap().get(oldNewOID.get(root)));
-        }
-
-        // jobEventListenerFactory
-        altered.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("New Job Spec:" + altered);
-        }
-
-        return altered;
-
-    }
-
-    private static boolean preProcessingRequired(FeedConnectionId connectionId) {
-        MetadataTransactionContext ctx = null;
-        Feed feed = null;
-        boolean preProcessingRequired = false;
-        try {
-            MetadataManager.INSTANCE.acquireReadLatch();
-            ctx = MetadataManager.INSTANCE.beginTransaction();
-            feed = MetadataManager.INSTANCE.getFeed(ctx, connectionId.getFeedId().getDataverse(),
-                    connectionId.getFeedId().getEntityName());
-            preProcessingRequired = feed.getAppliedFunction() != null;
-            MetadataManager.INSTANCE.commitTransaction(ctx);
-        } catch (Exception e) {
-            if (ctx != null) {
-                try {
-                    MetadataManager.INSTANCE.abortTransaction(ctx);
-                } catch (Exception abortException) {
-                    e.addSuppressed(abortException);
-                    throw new IllegalStateException(e);
-                }
-            }
-        } finally {
-            MetadataManager.INSTANCE.releaseReadLatch();
-        }
-        return preProcessingRequired;
-    }
-
     public static void validateFeed(Feed feed, MetadataTransactionContext mdTxnCtx, ILibraryManager
libraryManager)
             throws AsterixException {
         try {
@@ -537,32 +315,4 @@ public class FeedMetadataUtil {
         }
         return outputType;
     }
-
-    public static String getSecondaryFeedOutput(Feed feed, FeedPolicyAccessor policyAccessor,
-            MetadataTransactionContext mdTxnCtx)
-            throws AlgebricksException, MetadataException, RemoteException, ACIDException
{
-        String outputType = null;
-        String primaryFeedName = feed.getSourceFeedName();
-        Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feed.getDataverseName(),
primaryFeedName);
-        FunctionSignature appliedFunction = primaryFeed.getAppliedFunction();
-        if (appliedFunction == null) {
-            outputType = getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME)
-                    .getDisplayName();
-        } else {
-            Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
-            if (function != null) {
-                if (function.getLanguage().equals(Function.LANGUAGE_AQL)) {
-                    throw new NotImplementedException(
-                            "Secondary feeds derived from a source feed that has an applied
AQL function"
-                                    + " are not supported yet.");
-                } else {
-                    outputType = function.getReturnType();
-                }
-            } else {
-                throw new IllegalArgumentException(
-                        "Function " + appliedFunction + " associated with source feed not
found in Metadata.");
-            }
-        }
-        return outputType;
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java
deleted file mode 100644
index 00c9010..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveMessage;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedEventsListener;
-import org.apache.asterix.external.feed.message.EndFeedMessage;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
-import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.external.util.FeedUtils;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
-import org.apache.asterix.runtime.utils.RuntimeUtils;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
-
-/**
- * Provides helper method(s) for creating JobSpec for operations on a feed.
- */
-public class FeedOperations {
-
-    private FeedOperations() {
-    }
-
-    /**
-     * Builds the job spec for ingesting a (primary) feed from its external source via the
feed adaptor.
-     *
-     * @param primaryFeed
-     * @param metadataProvider
-     * @return JobSpecification the Hyracks job specification for receiving data from external
source
-     * @throws Exception
-     */
-    public static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed
primaryFeed,
-            MetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws
Exception {
-        JobSpecification spec = RuntimeUtils.createJobSpecification();
-        spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
-        IAdapterFactory adapterFactory;
-        IOperatorDescriptor feedIngestor;
-        AlgebricksPartitionConstraint ingesterPc;
-        Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory>
t =
-                metadataProvider.buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
-        feedIngestor = t.first;
-        ingesterPc = t.second;
-        adapterFactory = t.third;
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor,
ingesterPc);
-        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink,
ingesterPc);
-        spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0);
-        spec.addRoot(nullSink);
-        return new Pair<>(spec, adapterFactory);
-    }
-
-    /**
-     * Builds the job spec for sending message to an active feed to disconnect it from the
-     * its source.
-     */
-    public static Pair<JobSpecification, Boolean> buildDisconnectFeedJobSpec(FeedConnectionId
connectionId)
-            throws AlgebricksException {
-
-        JobSpecification spec = RuntimeUtils.createJobSpecification();
-        IOperatorDescriptor feedMessenger;
-        AlgebricksPartitionConstraint messengerPc;
-        List<String> locations = null;
-        FeedRuntimeType sourceRuntimeType;
-        try {
-            FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
-                    .getActiveEntityListener(connectionId.getFeedId());
-            FeedConnectJobInfo cInfo = listener.getFeedConnectJobInfo(connectionId);
-            IFeedJoint sourceFeedJoint = cInfo.getSourceFeedJoint();
-            IFeedJoint computeFeedJoint = cInfo.getComputeFeedJoint();
-
-            boolean terminateIntakeJob = false;
-            boolean completeDisconnect = computeFeedJoint == null || computeFeedJoint.getReceivers().isEmpty();
-            if (completeDisconnect) {
-                sourceRuntimeType = FeedRuntimeType.INTAKE;
-                locations = cInfo.getCollectLocations();
-                terminateIntakeJob = sourceFeedJoint.getReceivers().size() == 1;
-            } else {
-                locations = cInfo.getComputeLocations();
-                sourceRuntimeType = FeedRuntimeType.COMPUTE;
-            }
-
-            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDisconnectFeedMessengerRuntime(spec,
-                    connectionId, locations, sourceRuntimeType, completeDisconnect, sourceFeedJoint.getOwnerFeedId());
-
-            feedMessenger = p.first;
-            messengerPc = p.second;
-
-            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger,
messengerPc);
-            NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
-            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink,
messengerPc);
-            spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink,
0);
-            spec.addRoot(nullSink);
-            return new Pair<>(spec, terminateIntakeJob);
-
-        } catch (AlgebricksException e) {
-            throw new AsterixException(e);
-        }
-
-    }
-
-    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime(
-            JobSpecification jobSpec, FeedConnectionId feedConenctionId, IActiveMessage feedMessage,
-            Collection<String> locations) throws AlgebricksException {
-        AlgebricksPartitionConstraint partitionConstraint =
-                new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[]
{}));
-        FeedMessageOperatorDescriptor feedMessenger =
-                new FeedMessageOperatorDescriptor(jobSpec, feedConenctionId, feedMessage);
-        return new Pair<>(feedMessenger, partitionConstraint);
-    }
-
-    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
-            JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String>
locations,
-            FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, EntityId
sourceFeedId)
-            throws AlgebricksException {
-        IActiveMessage feedMessage = new EndFeedMessage(feedConenctionId, sourceFeedRuntimeType,
sourceFeedId,
-                completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED);
-        return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations);
-    }
-
-    public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws AsterixException
{
-        JobSpecification spec = RuntimeUtils.createJobSpecification();
-        AlgebricksAbsolutePartitionConstraint allCluster = ClusterStateManager.INSTANCE.getClusterLocations();
-        Set<String> nodes = new TreeSet<>();
-        for (String node : allCluster.getLocations()) {
-            nodes.add(node);
-        }
-        AlgebricksAbsolutePartitionConstraint locations =
-                new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[nodes.size()]));
-        FileSplit[] feedLogFileSplits =
-                FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations);
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint
=
-                StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
-        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first,
true);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second);
-        spec.addRoot(frod);
-        return spec;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
index a9c721a..81eaa9b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 
 public class MetadataLockManager {
 
@@ -480,6 +482,33 @@ public class MetadataLockManager {
         releaseDataverseReadLock(dataverseName);
     }
 
+    public void startFeedBegin(String dataverseName, String feedName, List<FeedConnection>
feedConnections) {
+        acquireDataverseReadLock(dataverseName);
+        acquireFeedReadLock(feedName);
+        for (FeedConnection feedConnection : feedConnections) {
+            acquireDatasetReadLock(dataverseName + "." + feedConnection.getDatasetName());
+        }
+    }
+
+    public void startFeedEnd(String dataverseName, String feedName, List<FeedConnection>
feedConnections) {
+        releaseDataverseReadLock(dataverseName);
+        releaseFeedReadLock(feedName);
+        for (FeedConnection feedConnection : feedConnections) {
+            releaseDatasetReadLock(dataverseName + "." + feedConnection.getDatasetName());
+        }
+    }
+
+    public void StopFeedBegin(String dataverseName, String feedName) {
+        // TODO: dataset lock?
+        acquireDataverseReadLock(dataverseName);
+        acquireFeedReadLock(feedName);
+    }
+
+    public void StopFeedEnd(String dataverseName, String feedName) {
+        releaseDataverseReadLock(dataverseName);
+        releaseFeedReadLock(feedName);
+    }
+
     public void createFeedBegin(String dataverseName, String feedFullyQualifiedName) {
         acquireDataverseReadLock(dataverseName);
         acquireFeedWriteLock(feedFullyQualifiedName);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
index df6c9b6..565d61b 100644
--- a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
+++ b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
@@ -19,8 +19,8 @@
 /*
  * Description  : Create a feed dataset that uses the feed simulator adapter.
                   The feed simulator simulates feed from a file in the local fs.
-                  Associate with the feed an external user-defined function. The UDF 
-                  finds topics in each tweet. A topic is identified by a #. 
+                  Associate with the feed an external user-defined function. The UDF
+                  finds topics in each tweet. A topic is identified by a #.
                   Begin ingestion and apply external user defined function
  * Expected Res : Success
  * Date         : 23rd Apr 2013

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
index 39a6272..3be4db8 100644
--- a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
+++ b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
@@ -19,8 +19,8 @@
 /*
  * Description  : Create a feed dataset that uses the feed simulator adapter.
                   The feed simulator simulates feed from a file in the local fs.
-                  Associate with the feed an external user-defined function. The UDF 
-                  finds topics in each tweet. A topic is identified by a #. 
+                  Associate with the feed an external user-defined function. The UDF
+                  finds topics in each tweet. A topic is identified by a #.
                   Begin ingestion and apply external user defined function
  * Expected Res : Success
  * Date         : 23rd Apr 2013
@@ -30,3 +30,5 @@ use dataverse externallibtest;
 set wait-for-completion-feed "true";
 
 connect feed TestTypedAdapterFeed to dataset TweetsTestAdapter;
+
+start feed TestTypedAdapterFeed;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
index 3df9d2b..2860f17 100644
--- a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
+++ b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
@@ -19,8 +19,8 @@
 /*
  * Description  : Create a feed dataset that uses the feed simulator adapter.
                   The feed simulator simulates feed from a file in the local fs.
-                  Associate with the feed an external user-defined function. The UDF 
-                  finds topics in each tweet. A topic is identified by a #. 
+                  Associate with the feed an external user-defined function. The UDF
+                  finds topics in each tweet. A topic is identified by a #.
                   Begin ingestion and apply external user defined function
  * Expected Res : Success
  * Date         : 23rd Apr 2013

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
index 35125a1..6e84ea3 100644
--- a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
+++ b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
@@ -19,8 +19,8 @@
 /*
  * Description  : Create a feed dataset that uses the feed simulator adapter.
                   The feed simulator simulates feed from a file in the local fs.
-                  Associate with the feed an external user-defined function. The UDF 
-                  finds topics in each tweet. A topic is identified by a #. 
+                  Associate with the feed an external user-defined function. The UDF
+                  finds topics in each tweet. A topic is identified by a #.
                   Begin ingestion and apply external user defined function
  * Expected Res : Success
  * Date         : 23rd Apr 2013
@@ -46,8 +46,9 @@ create type TweetOutputType as closed {
 
 create feed TweetFeed
 using file_feed
-(("type-name"="TweetInputType"),("fs"="localfs"),("path"="127.0.0.1://../../../../../../asterix-app/data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
-apply function testlib#parseTweet;
+(("type-name"="TweetInputType"),("fs"="localfs"),
+("path"="127.0.0.1://../../../../../../asterix-app/data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"));
 
-create dataset TweetsFeedIngest(TweetOutputType) 
+
+create dataset TweetsFeedIngest(TweetOutputType)
 primary key id;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
index d5a6f58..db68138 100644
--- a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
+++ b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
@@ -19,8 +19,8 @@
 /*
  * Description  : Create a feed dataset that uses the feed simulator adapter.
                   The feed simulator simulates feed from a file in the local fs.
-                  Associate with the feed an external user-defined function. The UDF 
-                  finds topics in each tweet. A topic is identified by a #. 
+                  Associate with the feed an external user-defined function. The UDF
+                  finds topics in each tweet. A topic is identified by a #.
                   Begin ingestion and apply external user defined function
  * Expected Res : Success
  * Date         : 23rd Apr 2013
@@ -29,4 +29,5 @@ use dataverse externallibtest;
 
 set wait-for-completion-feed "true";
 
-connect feed TweetFeed to dataset TweetsFeedIngest;
+connect feed TweetFeed to dataset TweetsFeedIngesta pply function testlib#parseTweet;
+start feed TweetFeed;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
index b188b52..22d1d27 100644
--- a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
+++ b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
@@ -19,8 +19,8 @@
 /*
  * Description  : Create a feed dataset that uses the feed simulator adapter.
                   The feed simulator simulates feed from a file in the local fs.
-                  Associate with the feed an external user-defined function. The UDF 
-                  finds topics in each tweet. A topic is identified by a #. 
+                  Associate with the feed an external user-defined function. The UDF
+                  finds topics in each tweet. A topic is identified by a #.
                   Begin ingestion and apply external user defined function
  * Expected Res : Success
  * Date         : 23rd Apr 2013


Mime
View raw message