asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjaco...@apache.org
Subject [6/7] asterixdb-bad git commit: Updated to match code changes to asterix
Date Wed, 07 Dec 2016 20:59:44 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
new file mode 100644
index 0000000..02389f1
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class CreateBrokerStatement implements IExtensionStatement {
+
+    private static final Logger LOGGER = Logger.getLogger(CreateBrokerStatement.class.getName());
+    private final Identifier dataverseName;
+    private final Identifier brokerName;
+    private String endPointName;
+
+    public CreateBrokerStatement(Identifier dataverseName, Identifier brokerName, String endPointName) {
+        this.brokerName = brokerName;
+        this.dataverseName = dataverseName;
+        this.endPointName = endPointName;
+    }
+
+    public String getEndPointName() {
+        return endPointName;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getBrokerName() {
+        return brokerName;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            Broker broker = BADLangExtension.getBroker(mdTxnCtx, dataverse, brokerName.getValue());
+            if (broker != null) {
+                throw new AlgebricksException("A broker with this name " + brokerName + " already exists.");
+            }
+            broker = new Broker(dataverse, brokerName.getValue(), endPointName);
+            MetadataManager.INSTANCE.addEntity(mdTxnCtx, broker);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (mdTxnCtx != null) {
+                QueryTranslator.abort(e, e, mdTxnCtx);
+            }
+            LOGGER.log(Level.WARNING, "Failed creating a broker", e);
+            throw new HyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
new file mode 100644
index 0000000..77de93e
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobInfo;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.ChannelEventsListener;
+import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
+import org.apache.asterix.file.JobSpecificationUtils;
+import org.apache.asterix.lang.aql.parser.AQLParserFactory;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.DatasetDecl;
+import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
+import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.om.base.temporal.ADurationParserFactory;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.util.JobUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
+
+public class CreateChannelStatement implements IExtensionStatement {
+
+    private static final Logger LOGGER = Logger.getLogger(CreateChannelStatement.class.getName());
+
+    private final Identifier dataverseName;
+    private final Identifier channelName;
+    private final FunctionSignature function;
+    private final CallExpr period;
+    private String duration;
+    private InsertStatement channelResultsInsertQuery;
+    private String subscriptionsTableName;
+    private String resultsTableName;
+    private boolean distributed;
+
+    public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
+            Expression period, boolean distributed) {
+        this.channelName = channelName;
+        this.dataverseName = dataverseName;
+        this.function = function;
+        this.period = (CallExpr) period;
+        this.duration = "";
+        this.distributed = distributed;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getChannelName() {
+        return channelName;
+    }
+
+    public String getResultsName() {
+        return resultsTableName;
+    }
+
+    public String getSubscriptionsName() {
+        return subscriptionsTableName;
+    }
+
+    public String getDuration() {
+        return duration;
+    }
+
+    public FunctionSignature getFunction() {
+        return function;
+    }
+
+    public Expression getPeriod() {
+        return period;
+    }
+
+    public InsertStatement getChannelResultsInsertQuery() {
+        return channelResultsInsertQuery;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    public void initialize(MetadataTransactionContext mdTxnCtx, String subscriptionsTableName, String resultsTableName)
+            throws MetadataException, HyracksDataException {
+        Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx, function);
+        if (lookup == null) {
+            throw new MetadataException(" Unknown function " + function.getName());
+        }
+
+        if (!period.getFunctionSignature().getName().equals("duration")) {
+            throw new MetadataException(
+                    "Expected argument period as a duration, but got " + period.getFunctionSignature().getName() + ".");
+        }
+        duration = ((StringLiteral) ((LiteralExpr) period.getExprList().get(0)).getValue()).getValue();
+        IValueParser durationParser = ADurationParserFactory.INSTANCE.createValueParser();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream outputStream = new DataOutputStream(bos);
+        durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
+        this.resultsTableName = resultsTableName;
+        this.subscriptionsTableName = subscriptionsTableName;
+
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    public Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildChannelJobSpec(String dataverse,
+            String channelName, String duration, MetadataProvider metadataProvider, JobSpecification channeljobSpec,
+            String strIP, int port) throws Exception {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IOperatorDescriptor channelQueryExecuter;
+        AlgebricksPartitionConstraint executerPc;
+
+        Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> p = buildChannelRuntime(spec, dataverse,
+                channelName, duration, channeljobSpec, strIP, port);
+        channelQueryExecuter = p.first;
+        executerPc = p.second;
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, channelQueryExecuter, executerPc);
+        spec.addRoot(channelQueryExecuter);
+        return new Pair<>(spec, p.second);
+
+    }
+
+    public Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> buildChannelRuntime(
+            JobSpecification jobSpec, String dataverse, String channelName, String duration,
+            JobSpecification channeljobSpec, String strIP, int port) throws Exception {
+        RepetitiveChannelOperatorDescriptor channelOp = new RepetitiveChannelOperatorDescriptor(jobSpec, dataverse,
+                channelName, duration, channeljobSpec, strIP, port);
+
+        String partition = ClusterStateManager.INSTANCE.getClusterLocations().getLocations()[0];
+        Set<String> ncs = new HashSet<>(Arrays.asList(partition));
+        AlgebricksAbsolutePartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+                ncs.toArray(new String[ncs.size()]));
+        return new Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint>(channelOp, partitionConstraint);
+    }
+
+    private void createDatasets(IStatementExecutor statementExecutor, Identifier subscriptionsName,
+            Identifier resultsName, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
+            IHyracksDataset hdc, Stats stats, String dataverse) throws AsterixException, Exception {
+
+        Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
+        Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
+        //Setup the subscriptions dataset
+        List<List<String>> partitionFields = new ArrayList<List<String>>();
+        List<Integer> keyIndicators = new ArrayList<Integer>();
+        keyIndicators.add(0);
+        List<String> fieldNames = new ArrayList<String>();
+        fieldNames.add(BADConstants.SubscriptionId);
+        partitionFields.add(fieldNames);
+        IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
+        DatasetDecl createSubscriptionsDataset = new DatasetDecl(new Identifier(dataverse), subscriptionsName,
+                new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null, null,
+                new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
+
+        //Setup the results dataset
+        partitionFields = new ArrayList<List<String>>();
+        fieldNames = new ArrayList<String>();
+        fieldNames.add(BADConstants.ResultId);
+        partitionFields.add(fieldNames);
+        idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
+        DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName,
+                new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
+                new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
+
+        //Run both statements to create datasets
+        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
+                hcc);
+        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
+
+    }
+
+    private JobSpecification createChannelJob(IStatementExecutor statementExecutor, Identifier subscriptionsName,
+            Identifier resultsName, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
+            IHyracksDataset hdc, Stats stats, String dataverse) throws Exception {
+        StringBuilder builder = new StringBuilder();
+        builder.append("insert into dataset " + dataverse + "." + resultsName + " ");
+        builder.append(" as $a (" + " let $" + BADConstants.ChannelExecutionTime + " := current-datetime() \n");
+
+        builder.append("for $sub in dataset " + dataverse + "." + subscriptionsName + "\n");
+        builder.append(
+                "for $broker in dataset " + BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + "\n");
+        builder.append("where $broker." + BADConstants.BrokerName + "= $sub." + BADConstants.BrokerName + "\n");
+        builder.append("and $broker." + BADConstants.DataverseName + "= $sub." + BADConstants.DataverseName + "\n");
+        builder.append(" for $result in " + function.getNamespace() + "." + function.getName() + "(");
+        int i = 0;
+        for (; i < function.getArity() - 1; i++) {
+            builder.append("$sub.param" + i + ",");
+        }
+        builder.append("$sub.param" + i + ")\n");
+        builder.append("return {\n");
+        builder.append("\"" + BADConstants.ChannelExecutionTime + "\":$" + BADConstants.ChannelExecutionTime + ",");
+        builder.append("\"" + BADConstants.SubscriptionId + "\":$sub." + BADConstants.SubscriptionId + ",");
+        builder.append("\"" + BADConstants.DeliveryTime + "\":current-datetime(),");
+        builder.append("\"result\":$result");
+        builder.append("}");
+        builder.append(")");
+        builder.append(" returning $a");
+        builder.append(";");
+        AQLParserFactory aqlFact = new AQLParserFactory();
+        List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
+        return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0),
+                hcc, hdc, ResultDelivery.ASYNC, stats, true);
+    }
+
+    private void setupCompiledJob(MetadataProvider metadataProvider, String dataverse, EntityId entityId,
+            JobSpecification channeljobSpec, IHyracksClientConnection hcc) throws Exception {
+        ICCApplicationContext iCCApp = AsterixAppContextInfo.INSTANCE.getCCApplicationContext();
+        ClusterControllerInfo ccInfo = iCCApp.getCCContext().getClusterControllerInfo();
+        String strIP = ccInfo.getClientNetAddress();
+        int port = ccInfo.getClientNetPort();
+        //Create Channel Operator
+        Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> alteredJobSpec = buildChannelJobSpec(dataverse,
+                channelName.getValue(), duration, metadataProvider, channeljobSpec, strIP, port);
+
+        ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null, ActivityState.ACTIVE, alteredJobSpec.first);
+        alteredJobSpec.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, channelJobInfo);
+        JobUtils.runJob(hcc, alteredJobSpec.first, false);
+    }
+
+    private void setupDistributedJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc)
+            throws Exception {
+        ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null, ActivityState.ACTIVE, channeljobSpec);
+        channeljobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, channelJobInfo);
+        JobId jobId = hcc.startJob(channeljobSpec, EnumSet.of(JobFlag.STORE_JOB));
+        ChannelJobService.startJob(channeljobSpec, EnumSet.of(JobFlag.STORE_JOB), jobId, hcc,
+                ChannelJobService.findPeriod(duration));
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+        //This function performs three tasks:
+        //1. Create datasets for the Channel
+        //2. Create and run the Channel Job
+        //3. Create the metadata entry for the channel
+
+        //TODO: Figure out how to handle when a subset of the 3 tasks fails
+        //TODO: The compiled job will break if anything changes on the function or two datasets
+        // Need to make sure we do proper checking when altering these things
+
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+
+        Identifier subscriptionsName = new Identifier(channelName + BADConstants.subscriptionEnding);
+        Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
+        EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
+        ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE
+                .getActiveEntityListener(entityId);
+        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
+        boolean subscriberRegistered = false;
+        Channel channel = null;
+
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+            if (channel != null) {
+                throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
+            }
+            if (listener != null) {
+                subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
+            }
+            if (subscriberRegistered) {
+                throw new AsterixException("Channel " + channelName + " is already running");
+            }
+            initialize(mdTxnCtx, subscriptionsName.getValue(), resultsName.getValue());
+            channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
+                    duration);
+
+            //check if names are available before creating anything
+            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue()) != null) {
+                throw new AsterixException("The channel name:" + channelName + " is not available.");
+            }
+            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()) != null) {
+                throw new AsterixException("The channel name:" + channelName + " is not available.");
+            }
+
+            // Now we subscribe
+            if (listener == null) {
+                listener = new ChannelEventsListener(entityId);
+                ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+            }
+            listener.registerEventSubscriber(eventSubscriber);
+            subscriberRegistered = true;
+
+            //Create Channel Datasets
+            createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider, hcc, hdc, stats,
+                    dataverse);
+
+            //Create Channel Internal Job
+            JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
+                    metadataProvider, hcc, hdc, stats, dataverse);
+
+            if (distributed) {
+                setupDistributedJob(entityId, channeljobSpec, hcc);
+            } else {
+                setupCompiledJob(metadataProvider, dataverse, entityId, channeljobSpec, hcc);
+            }
+            eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
+
+            MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (mdTxnCtx != null) {
+                QueryTranslator.abort(e, e, mdTxnCtx);
+            }
+            LOGGER.log(Level.WARNING, "Failed creating a channel", e);
+            throw new HyracksDataException(e);
+        }
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
new file mode 100644
index 0000000..3edc7dc
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.ChannelEventsListener;
+import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
+import org.apache.asterix.lang.aql.parser.AQLParserFactory;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class CreateProcedureStatement implements IExtensionStatement {
+
+    private static final Logger LOGGER = Logger.getLogger(CreateProcedureStatement.class.getName());
+
+    private final FunctionSignature signature;
+    private final String functionBody;
+    private final List<String> paramList;
+
+    public FunctionSignature getaAterixFunction() {
+        return signature;
+    }
+
+    public String getFunctionBody() {
+        return functionBody;
+    }
+
+    public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList,
+            String functionBody) {
+        this.signature = signature;
+        this.functionBody = functionBody;
+        this.paramList = new ArrayList<String>();
+        for (VarIdentifier varId : parameterList) {
+            this.paramList.add(varId.getValue());
+        }
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    public List<String> getParamList() {
+        return paramList;
+    }
+
+    public FunctionSignature getSignature() {
+        return signature;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    private JobSpecification createProcedureJob(String body, IStatementExecutor statementExecutor,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats)
+                    throws Exception {
+        StringBuilder builder = new StringBuilder();
+        builder.append(body);
+        builder.append(";");
+        AQLParserFactory aqlFact = new AQLParserFactory();
+        List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
+        if (fStatements.size() > 1) {
+            throw new Exception("Procedure can only execute a single statement");
+        }
+        return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0),
+                hcc, hdc, ResultDelivery.ASYNC, stats, true);
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+        String dataverse =
+                ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
+
+        EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
+        ChannelEventsListener listener =
+                (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
+        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
+        boolean subscriberRegistered = false;
+        Procedure procedure = null;
+
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, signature.getName(),
+                    Integer.toString(signature.getArity()));
+            if (procedure != null) {
+                throw new AlgebricksException("A procedure with this name " + signature.getName() + " already exists.");
+            }
+            if (listener != null) {
+                subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
+            }
+            if (subscriberRegistered) {
+                throw new AsterixException("Procedure " + signature.getName() + " is already running");
+            }
+
+            procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),
+                    Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL);
+
+            // Now we subscribe
+            if (listener == null) {
+                listener = new ChannelEventsListener(entityId);
+                ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+            }
+            listener.registerEventSubscriber(eventSubscriber);
+            subscriberRegistered = true;
+
+
+            //Create Procedure Internal Job
+            JobSpecification channeljobSpec =
+                    createProcedureJob(getFunctionBody(), statementExecutor, metadataProvider, hcc, hdc, stats);
+
+            // setupDistributedJob(entityId, channeljobSpec, hcc);
+
+            eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
+
+            MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (mdTxnCtx != null) {
+                QueryTranslator.abort(e, e, mdTxnCtx);
+            }
+            LOGGER.log(Level.WARNING, "Failed creating a procedure", e);
+            throw new HyracksDataException(e);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
new file mode 100644
index 0000000..7222b1a
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.rmi.RemoteException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
+import org.apache.asterix.metadata.api.IMetadataEntity;
+import org.apache.asterix.metadata.api.IMetadataExtension;
+import org.apache.asterix.metadata.api.IMetadataIndex;
+import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
+import org.apache.asterix.metadata.entities.Datatype;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class BADMetadataExtension implements IMetadataExtension {
+
+    public static final ExtensionId BAD_METADATA_EXTENSION_ID = new ExtensionId(
+            BADConstants.BAD_METADATA_EXTENSION_NAME, 0);
+    public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
+            NonTaggedDataFormat.class.getName(), IMetadataEntity.PENDING_NO_OP);
+
+    public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+            BADConstants.ChannelSubscriptionsType, BADMetadataRecordTypes.channelSubscriptionsType, false);
+    public static final Datatype BAD_RESULT_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+            BADConstants.ChannelResultsType, BADMetadataRecordTypes.channelResultsType, false);
+
+    public static final Datatype BAD_BROKER_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+            BADConstants.RECORD_TYPENAME_BROKER, BADMetadataRecordTypes.BROKER_RECORDTYPE, false);
+
+    public static final Datatype BAD_CHANNEL_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+            BADConstants.RECORD_TYPENAME_CHANNEL, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, false);
+
+    public static final Datatype BAD_PROCEDURE_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+            BADConstants.RECORD_TYPENAME_PROCEDURE, BADMetadataRecordTypes.PROCEDURE_RECORDTYPE, false);
+
+    @Override
+    public ExtensionId getId() {
+        return BAD_METADATA_EXTENSION_ID;
+    }
+
+    @Override
+    public void configure(List<Pair<String, String>> args) {
+        // do nothing??
+    }
+
+    @Override
+    public MetadataTupleTranslatorProvider getMetadataTupleTranslatorProvider() {
+        return new MetadataTupleTranslatorProvider();
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public List<ExtensionMetadataDataset> getExtensionIndexes() {
+        try {
+            return Arrays.asList(BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+    }
+
+    @Override
+    public void initializeMetadata() throws HyracksDataException, RemoteException, ACIDException {
+        // enlist datasets
+        MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.CHANNEL_DATASET);
+        MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.BROKER_DATASET);
+        MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.PROCEDURE_DATASET);
+        if (MetadataBootstrap.isNewUniverse()) {
+            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            try {
+                // add metadata datasets
+                MetadataBootstrap.insertMetadataDatasets(mdTxnCtx,
+                        new IMetadataIndex[] { BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET,
+                                BADMetadataIndexes.PROCEDURE_DATASET });
+                // insert default dataverse
+                // TODO prevent user from dropping this dataverse
+                // MetadataManager.INSTANCE.addDataverse(mdTxnCtx, BAD_DATAVERSE);
+                // insert default data type
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_RESULT_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_SUBSCRIPTION_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_PROCEDURE_DATATYPE);
+                // TODO prevent user from dropping these types
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            } catch (Exception e) {
+                e.printStackTrace();
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
+        }
+        // local recovery?
+        // nothing for now
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
new file mode 100644
index 0000000..b33dcad
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.Arrays;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+
+public class BADMetadataIndexes {
+
+    public static final ExtensionMetadataDatasetId BAD_CHANNEL_INDEX_ID = new ExtensionMetadataDatasetId(
+            BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.CHANNEL_EXTENSION_NAME);
+    public static final MetadataIndexImmutableProperties PROPERTIES_CHANNEL = new MetadataIndexImmutableProperties(
+            BADConstants.CHANNEL_EXTENSION_NAME,
+            MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID,
+            MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID);
+
+    public static final ExtensionMetadataDatasetId BAD_BROKER_INDEX_ID = new ExtensionMetadataDatasetId(
+            BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.BROKER_KEYWORD);
+    public static final MetadataIndexImmutableProperties PROPERTIES_BROKER = new MetadataIndexImmutableProperties(
+            BADConstants.BROKER_KEYWORD,
+            MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 1,
+            MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 1);
+
+    public static final ExtensionMetadataDatasetId BAD_PROCEDURE_INDEX_ID = new ExtensionMetadataDatasetId(
+            BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.PROCEDURE_KEYWORD);
+    public static final MetadataIndexImmutableProperties PROPERTIES_PROCEDURE =
+            new MetadataIndexImmutableProperties(BADConstants.PROCEDURE_KEYWORD,
+                    MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 2,
+                    MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 2);
+
+    public static final int NUM_FIELDS_CHANNEL_IDX = 3;
+    public static final int NUM_FIELDS_BROKER_IDX = 3;
+    public static final int NUM_FIELDS_PROCEDURE_IDX = 4;
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static final ExtensionMetadataDataset CHANNEL_DATASET = new ExtensionMetadataDataset(PROPERTIES_CHANNEL,
+            NUM_FIELDS_CHANNEL_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+            Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+                    Arrays.asList(BADConstants.ChannelName)),
+            0, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, true, new int[] { 0, 1 }, BAD_CHANNEL_INDEX_ID,
+            new ChannelTupleTranslator(true));
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static final ExtensionMetadataDataset BROKER_DATASET = new ExtensionMetadataDataset(PROPERTIES_BROKER,
+            NUM_FIELDS_BROKER_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+            Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+                    Arrays.asList(BADConstants.BrokerName)),
+            0, BADMetadataRecordTypes.BROKER_RECORDTYPE, true, new int[] { 0, 1 }, BAD_BROKER_INDEX_ID,
+            new BrokerTupleTranslator(true));
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static final ExtensionMetadataDataset PROCEDURE_DATASET = new ExtensionMetadataDataset(PROPERTIES_PROCEDURE,
+            NUM_FIELDS_PROCEDURE_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+            Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+                    Arrays.asList(BADConstants.ProcedureName), Arrays.asList(BADConstants.FIELD_NAME_ARITY)),
+            0, BADMetadataRecordTypes.PROCEDURE_RECORDTYPE, true, new int[] { 0, 1, 2 }, BAD_PROCEDURE_INDEX_ID,
+            new ProcedureTupleTranslator(true));
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
new file mode 100644
index 0000000..6ee5735
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+
+public class BADMetadataRecordTypes {
+
+    // -------------------------------------- Subscriptions --------------------------------------//
+    private static final String[] subTypeFieldNames = { BADConstants.DataverseName, BADConstants.BrokerName,
+            BADConstants.SubscriptionId };
+    private static final IAType[] subTypeFieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AUUID };
+    public static final ARecordType channelSubscriptionsType = new ARecordType(BADConstants.ChannelSubscriptionsType,
+            subTypeFieldNames, subTypeFieldTypes, true);
+
+    // ---------------------------------------- Results --------------------------------------------//
+    private static final String[] resultTypeFieldNames = { BADConstants.ResultId, BADConstants.ChannelExecutionTime,
+            BADConstants.SubscriptionId, BADConstants.DeliveryTime };
+    private static final IAType[] resultTypeFieldTypes = { BuiltinType.AUUID, BuiltinType.ADATETIME, BuiltinType.AUUID,
+            BuiltinType.ADATETIME };
+    public static final ARecordType channelResultsType = new ARecordType(BADConstants.ChannelResultsType,
+            resultTypeFieldNames, resultTypeFieldTypes, true);
+
+    //------------------------------------------ Channel ----------------------------------------//     
+    public static final int CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
+    public static final int CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX = 1;
+    public static final int CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
+    public static final int CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX = 3;
+    public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
+    public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
+    public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
+            // RecordTypeName
+            BADConstants.RECORD_TYPENAME_CHANNEL,
+            // FieldNames
+            new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
+                    BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration },
+            // FieldTypes
+            new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+                    BuiltinType.ASTRING, BuiltinType.ASTRING },
+            //IsOpen?
+            true);
+    //------------------------------------------ Broker ----------------------------------------//
+    public static final int BROKER_DATAVERSE_NAME_FIELD_INDEX = 0;
+    public static final int BROKER_NAME_FIELD_INDEX = 1;
+    public static final int BROKER_ENDPOINT_FIELD_INDEX = 2;
+    public static final ARecordType BROKER_RECORDTYPE = MetadataRecordTypes.createRecordType(
+            // RecordTypeName
+            BADConstants.RECORD_TYPENAME_BROKER,
+            // FieldNames
+            new String[] { BADConstants.DataverseName, BADConstants.BrokerName, BADConstants.BrokerEndPoint },
+            // FieldTypes
+            new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+                    BuiltinType.ASTRING, BuiltinType.ASTRING },
+            //IsOpen?
+            true);
+
+    //----------------------------------------- Procedure ----------------------------------------//
+    public static final int PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX = 0;
+    public static final int PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX = 1;
+    public static final int PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX = 2;
+    public static final int PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX = 3;
+    public static final int PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX = 4;
+    public static final int PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX = 5;
+    public static final int PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX = 6;
+    public static final ARecordType PROCEDURE_RECORDTYPE = MetadataRecordTypes.createRecordType(
+            // RecordTypeName
+            BADConstants.RECORD_TYPENAME_PROCEDURE,
+            // FieldNames
+            new String[] { BADConstants.DataverseName, BADConstants.ProcedureName, BADConstants.FIELD_NAME_ARITY,
+                    BADConstants.FIELD_NAME_PARAMS, BADConstants.FIELD_NAME_RETURN_TYPE,
+                    BADConstants.FIELD_NAME_DEFINITION, BADConstants.FIELD_NAME_LANGUAGE },
+            // FieldTypes
+            new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+                    new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING, BuiltinType.ASTRING,
+                    BuiltinType.ASTRING },
+            //IsOpen?
+            true);
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
new file mode 100644
index 0000000..006f0dc
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Broker.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2015 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+
+/**
+ * Metadata describing a broker.
+ */
+public class Broker implements IExtensionMetadataEntity {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String dataverseName;
+    private final String brokerName;
+    private final String endPointName;
+
+    public Broker(String dataverseName, String brokerName, String endPointName) {
+        this.endPointName = endPointName;
+        this.dataverseName = dataverseName;
+        this.brokerName = brokerName;
+    }
+
+    public String getDataverseName() {
+        return dataverseName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public String getEndPointName() {
+        return endPointName;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof Broker)) {
+            return false;
+        }
+        Broker otherDataset = (Broker) other;
+        if (!otherDataset.brokerName.equals(brokerName)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
new file mode 100644
index 0000000..b73e9e3
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class BrokerSearchKey implements IExtensionMetadataSearchKey {
+    private static final long serialVersionUID = 1L;
+    private final String dataverse;
+    private final String broker;
+
+    public BrokerSearchKey(String dataverse, String broker) {
+        this.dataverse = dataverse;
+        this.broker = broker;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
+    }
+
+    @Override
+    public ITupleReference getSearchKey() {
+        return MetadataNode.createTuple(dataverse, broker);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
new file mode 100644
index 0000000..34397f4
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AString;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Channel metadata entity to an ITupleReference and vice versa.
+ */
+public class BrokerTupleTranslator extends AbstractTupleTranslator<Broker> {
+    // Field indexes of serialized Broker in a tuple.
+    // Key field.
+    public static final int BROKER_DATAVERSE_NAME_FIELD_INDEX = 0;
+
+    public static final int BROKER_NAME_FIELD_INDEX = 1;
+
+    // Payload field containing serialized broker.
+    public static final int BROKER_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ARecord> recordSerDes =
+            SerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BADMetadataRecordTypes.BROKER_RECORDTYPE);
+
+    @SuppressWarnings("unchecked")
+    public BrokerTupleTranslator(boolean getTuple) {
+        super(getTuple, BADMetadataIndexes.NUM_FIELDS_BROKER_IDX);
+    }
+
+    @Override
+    public Broker getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
+        byte[] serRecord = frameTuple.getFieldData(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordStartOffset = frameTuple.getFieldStart(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordLength = frameTuple.getFieldLength(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
+        ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+        DataInput in = new DataInputStream(stream);
+        ARecord channelRecord = recordSerDes.deserialize(in);
+        return createBrokerFromARecord(channelRecord);
+    }
+
+    private Broker createBrokerFromARecord(ARecord brokerRecord) {
+        Broker broker = null;
+        String dataverseName = ((AString) brokerRecord
+                .getValueByPos(BADMetadataRecordTypes.BROKER_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
+        String brokerName = ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_NAME_FIELD_INDEX))
+                .getStringValue();
+        String endPointName = ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX))
+                .getStringValue();
+
+        broker = new Broker(dataverseName, brokerName, endPointName);
+        return broker;
+    }
+
+    @Override
+    public ITupleReference getTupleFromMetadataEntity(Broker broker) throws IOException, MetadataException {
+        // write the key in the first fields of the tuple
+
+        tupleBuilder.reset();
+        aString.setValue(broker.getDataverseName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        aString.setValue(broker.getBrokerName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        recordBuilder.reset(BADMetadataRecordTypes.BROKER_RECORDTYPE);
+
+        // write field 0
+        fieldValue.reset();
+        aString.setValue(broker.getDataverseName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.BROKER_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 1
+        fieldValue.reset();
+        aString.setValue(broker.getBrokerName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.BROKER_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 2
+        fieldValue.reset();
+        aString.setValue(broker.getEndPointName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX, fieldValue);
+
+        // write record
+        recordBuilder.write(tupleBuilder.getDataOutput(), true);
+
+        tupleBuilder.addFieldEndOffset();
+
+        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+        return tuple;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
new file mode 100644
index 0000000..b201af6
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2015 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+
+/**
+ * Metadata describing a channel.
+ */
+public class Channel implements IExtensionMetadataEntity {
+
+    private static final long serialVersionUID = 1L;
+
+    /** A unique identifier for the channel */
+    protected final EntityId channelId;
+    private final String subscriptionsDatasetName;
+    private final String resultsDatasetName;
+    private final String duration;
+    private final FunctionSignature function;
+
+    public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
+            FunctionSignature function, String duration) {
+        this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
+        this.function = function;
+        this.duration = duration;
+        this.resultsDatasetName = resultsDataset;
+        this.subscriptionsDatasetName = subscriptionsDataset;
+    }
+
+    public EntityId getChannelId() {
+        return channelId;
+    }
+
+    public String getSubscriptionsDataset() {
+        return subscriptionsDatasetName;
+    }
+
+    public String getResultsDatasetName() {
+        return resultsDatasetName;
+    }
+
+    public String getDuration() {
+        return duration;
+    }
+
+    public FunctionSignature getFunction() {
+        return function;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof Channel)) {
+            return false;
+        }
+        Channel otherDataset = (Channel) other;
+        if (!otherDataset.channelId.equals(channelId)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
new file mode 100644
index 0000000..a3c757b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobInfo;
+import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.log4j.Logger;
+
+public class ChannelEventsListener implements IActiveEntityEventsListener {
+    private static final Logger LOGGER = Logger.getLogger(ChannelEventsListener.class);
+    private final List<IActiveLifecycleEventSubscriber> subscribers;
+    private final Map<Long, ActiveJob> jobs;
+    private final Map<EntityId, ChannelJobInfo> jobInfos;
+    private EntityId entityId;
+
+    public ChannelEventsListener(EntityId entityId) {
+        this.entityId = entityId;
+        subscribers = new ArrayList<>();
+        jobs = new HashMap<>();
+        jobInfos = new HashMap<>();
+    }
+
+    @Override
+    public void notify(ActiveEvent event) {
+        try {
+            switch (event.getEventKind()) {
+                case JOB_START:
+                    handleJobStartEvent(event);
+                    break;
+                case JOB_FINISH:
+                    handleJobFinishEvent(event);
+                    break;
+                case PARTITION_EVENT:
+                    LOGGER.warn("Partition Channel Event");
+                    break;
+                default:
+                    break;
+
+            }
+        } catch (Exception e) {
+            LOGGER.error("Unhandled Exception", e);
+        }
+    }
+
+    private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
+        ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+        handleJobStartMessage((ChannelJobInfo) jobInfo);
+    }
+
+    private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
+        ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("Channel Job finished for  " + jobInfo);
+        }
+        handleJobFinishMessage((ChannelJobInfo) jobInfo);
+    }
+
+    private synchronized void handleJobFinishMessage(ChannelJobInfo cInfo) throws Exception {
+        EntityId channelJobId = cInfo.getEntityId();
+
+        IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
+        JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+        JobStatus status = info.getStatus();
+        boolean failure = status != null && status.equals(JobStatus.FAILURE);
+
+        jobInfos.remove(channelJobId);
+        jobs.remove(cInfo.getJobId().getId());
+        // notify event listeners
+        ActiveLifecycleEvent event = failure ? ActiveLifecycleEvent.ACTIVE_JOB_FAILED
+                : ActiveLifecycleEvent.ACTIVE_JOB_ENDED;
+        notifyEventSubscribers(event);
+    }
+
+    private void notifyEventSubscribers(ActiveLifecycleEvent event) {
+        if (subscribers != null && !subscribers.isEmpty()) {
+            for (IActiveLifecycleEventSubscriber subscriber : subscribers) {
+                subscriber.handleEvent(event);
+            }
+        }
+    }
+
+    private static synchronized void handleJobStartMessage(ChannelJobInfo cInfo) throws Exception {
+        List<OperatorDescriptorId> channelOperatorIds = new ArrayList<>();
+        Map<OperatorDescriptorId, IOperatorDescriptor> operators = cInfo.getSpec().getOperatorMap();
+        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+            IOperatorDescriptor opDesc = entry.getValue();
+            if (opDesc instanceof RepetitiveChannelOperatorDescriptor) {
+                channelOperatorIds.add(opDesc.getOperatorId());
+            }
+        }
+
+        IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
+        JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+        List<String> locations = new ArrayList<>();
+        for (OperatorDescriptorId channelOperatorId : channelOperatorIds) {
+            Map<Integer, String> operatorLocations = info.getOperatorLocations().get(channelOperatorId);
+            int nOperatorInstances = operatorLocations.size();
+            for (int i = 0; i < nOperatorInstances; i++) {
+                locations.add(operatorLocations.get(i));
+            }
+        }
+        cInfo.setLocations(locations);
+        cInfo.setState(ActivityState.ACTIVE);
+    }
+
+    @Override
+    public void notifyJobCreation(JobId jobId, JobSpecification spec) {
+        try {
+            registerJob(jobId, spec);
+            return;
+
+        } catch (Exception e) {
+            LOGGER.error(e);
+        }
+    }
+
+    public synchronized void registerJob(JobId jobId, JobSpecification jobSpec) {
+        if (jobs.get(jobId.getId()) != null) {
+            throw new IllegalStateException("Channel job already registered");
+        }
+        if (jobInfos.containsKey(jobId.getId())) {
+            throw new IllegalStateException("Channel job already registered");
+        }
+
+        ChannelJobInfo cInfo = new ChannelJobInfo(entityId, jobId, ActivityState.CREATED, jobSpec);
+        jobs.put(jobId.getId(), cInfo);
+        jobInfos.put(entityId, cInfo);
+
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("Registered channel job [" + jobId + "]" + " for channel " + entityId);
+        }
+
+        notifyEventSubscribers(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
+
+    }
+
+    public JobSpecification getJobSpecification(EntityId activeJobId) {
+        return jobInfos.get(activeJobId).getSpec();
+    }
+
+    public ChannelJobInfo getJobInfo(EntityId activeJobId) {
+        return jobInfos.get(activeJobId);
+    }
+
+    public synchronized void registerEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
+        subscribers.add(subscriber);
+    }
+
+    public void deregisterEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
+        subscribers.remove(subscriber);
+    }
+
+    public synchronized boolean isChannelActive(EntityId activeJobId, IActiveLifecycleEventSubscriber eventSubscriber) {
+        boolean active = false;
+        ChannelJobInfo cInfo = jobInfos.get(activeJobId);
+        if (cInfo != null) {
+            active = cInfo.getState().equals(ActivityState.ACTIVE);
+        }
+        if (active) {
+            registerEventSubscriber(eventSubscriber);
+        }
+        return active;
+    }
+
+    public FeedConnectionId[] getConnections() {
+        return jobInfos.keySet().toArray(new FeedConnectionId[jobInfos.size()]);
+    }
+
+    @Override
+    public boolean isEntityActive() {
+        return !jobs.isEmpty();
+    }
+
+    @Override
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+    @Override
+    public boolean isEntityUsingDataset(String dataverseName, String datasetName) {
+        if (entityId.getDataverse().equals(dataverseName)) {
+            String subscriptionsName = entityId.getEntityName() + BADConstants.subscriptionEnding;
+            String resultsName = entityId.getEntityName() + BADConstants.resultsEnding;
+            if (datasetName.equals(subscriptionsName) || datasetName.equals(resultsName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
new file mode 100644
index 0000000..679548c
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ChannelSearchKey implements IExtensionMetadataSearchKey {
+    private static final long serialVersionUID = 1L;
+    private final String dataverse;
+    private final String channel;
+
+    public ChannelSearchKey(String dataverse, String channel) {
+        this.dataverse = dataverse;
+        this.channel = channel;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
+    }
+
+    @Override
+    public ITupleReference getSearchKey() {
+        return MetadataNode.createTuple(dataverse, channel);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
new file mode 100644
index 0000000..b9ae250
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AString;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Channel metadata entity to an ITupleReference and vice versa.
+ */
+public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
+    // Field indexes of serialized Feed in a tuple.
+    // Key field.
+    public static final int CHANNEL_DATAVERSE_NAME_FIELD_INDEX = 0;
+
+    public static final int CHANNEL_NAME_FIELD_INDEX = 1;
+
+    // Payload field containing serialized feed.
+    public static final int CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BADMetadataRecordTypes.CHANNEL_RECORDTYPE);
+
+    @SuppressWarnings("unchecked")
+    public ChannelTupleTranslator(boolean getTuple) {
+        super(getTuple, BADMetadataIndexes.NUM_FIELDS_CHANNEL_IDX);
+    }
+
+    @Override
+    public Channel getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
+        byte[] serRecord = frameTuple.getFieldData(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordStartOffset = frameTuple.getFieldStart(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordLength = frameTuple.getFieldLength(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
+        ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+        DataInput in = new DataInputStream(stream);
+        ARecord channelRecord = recordSerDes.deserialize(in);
+        return createChannelFromARecord(channelRecord);
+    }
+
+    private Channel createChannelFromARecord(ARecord channelRecord) {
+        Channel channel = null;
+        String dataverseName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
+        String channelName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX)).getStringValue();
+        String subscriptionsName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX)).getStringValue();
+        String resultsName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX)).getStringValue();
+        String fName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX)).getStringValue();
+        String duration = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DURATION_FIELD_INDEX)).getStringValue();
+
+        FunctionSignature signature = null;
+
+        String[] qnameComponents = fName.split("\\.");
+        String functionDataverse;
+        String functionName;
+        if (qnameComponents.length == 2) {
+            functionDataverse = qnameComponents[0];
+            functionName = qnameComponents[1];
+        } else {
+            functionDataverse = dataverseName;
+            functionName = qnameComponents[0];
+        }
+
+        String[] nameComponents = functionName.split("@");
+        signature = new FunctionSignature(functionDataverse, nameComponents[0], Integer.parseInt(nameComponents[1]));
+
+        channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration);
+        return channel;
+    }
+
+    @Override
+    public ITupleReference getTupleFromMetadataEntity(Channel channel) throws IOException, MetadataException {
+        // write the key in the first fields of the tuple
+
+        tupleBuilder.reset();
+        aString.setValue(channel.getChannelId().getDataverse());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        aString.setValue(channel.getChannelId().getEntityName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        recordBuilder.reset(BADMetadataRecordTypes.CHANNEL_RECORDTYPE);
+
+        // write field 0
+        fieldValue.reset();
+        aString.setValue(channel.getChannelId().getDataverse());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 1
+        fieldValue.reset();
+        aString.setValue(channel.getChannelId().getEntityName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 2
+        fieldValue.reset();
+        aString.setValue(channel.getSubscriptionsDataset());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 3
+        fieldValue.reset();
+        aString.setValue(channel.getResultsDatasetName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 4
+        fieldValue.reset();
+        aString.setValue(channel.getFunction().toString());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX, fieldValue);
+
+        // write field 5
+        fieldValue.reset();
+        aString.setValue(channel.getDuration());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DURATION_FIELD_INDEX, fieldValue);
+
+        // write record
+        recordBuilder.write(tupleBuilder.getDataOutput(), true);
+
+        tupleBuilder.addFieldEndOffset();
+
+        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+        return tuple;
+    }
+}
\ No newline at end of file


Mime
View raw message