asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjaco...@apache.org
Subject [3/7] asterixdb-bad git commit: Updated to match code changes to asterix
Date Wed, 07 Dec 2016 20:59:41 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java b/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
deleted file mode 100644
index 4198230..0000000
--- a/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.lang;
-
-import java.util.List;
-
-import org.apache.asterix.app.cc.CompilerExtensionManager;
-import org.apache.asterix.app.cc.IStatementExecutorExtension;
-import org.apache.asterix.common.api.ExtensionId;
-import org.apache.asterix.runtime.util.AsterixAppContextInfo;
-import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-
-public class BADQueryTranslatorExtension implements IStatementExecutorExtension {
-
-    public static final ExtensionId BAD_QUERY_TRANSLATOR_EXTENSION_ID = new ExtensionId(
-            BADQueryTranslatorExtension.class.getSimpleName(), 0);
-
-    private static class LazyHolder {
-        private static final IStatementExecutorFactory INSTANCE = new BADQueryTranslatorFactory(
-                (CompilerExtensionManager) AsterixAppContextInfo.INSTANCE.getExtensionManager());
-    }
-
-    @Override
-    public ExtensionId getId() {
-        return BAD_QUERY_TRANSLATOR_EXTENSION_ID;
-    }
-
-    @Override
-    public void configure(List<Pair<String, String>> args) {
-    }
-
-    @Override
-    public IStatementExecutorFactory getQueryTranslatorFactory() {
-        return LazyHolder.INSTANCE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java b/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
deleted file mode 100644
index b8a6050..0000000
--- a/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.lang;
-
-import java.util.List;
-
-import org.apache.asterix.app.cc.CompilerExtensionManager;
-import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.common.app.SessionConfig;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.lang.common.base.Statement;
-
-public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory {
-
-    public BADQueryTranslatorFactory(CompilerExtensionManager ccExtensionManager) {
-        super(ccExtensionManager);
-    }
-
-    @Override
-    public QueryTranslator create(List<Statement> statements, SessionConfig conf,
-            ILangCompilationProvider compilationProvider) {
-        return new BADStatementExecutor(statements, conf, compilationProvider, cExtensionManager);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
deleted file mode 100644
index 1d4864f..0000000
--- a/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.lang;
-
-import java.util.List;
-
-import org.apache.asterix.app.cc.CompilerExtensionManager;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.common.app.SessionConfig;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.lang.common.base.Statement;
-
-public class BADStatementExecutor extends QueryTranslator {
-
-    public BADStatementExecutor(List<Statement> aqlStatements, SessionConfig conf,
-            ILangCompilationProvider compliationProvider, CompilerExtensionManager ccExtensionManager) {
-        super(aqlStatements, conf, compliationProvider, ccExtensionManager);
-    }
-
-    /*
-    @Override
-    protected void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-        super(metadataProvider, stmt, hcc);
-        //TODO: need to drop channels and brokers
-        //TODO: need to check if datasets or functions are in use by channels
-    }*/
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
deleted file mode 100644
index 53b5ff7..0000000
--- a/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.lang.statement;
-
-import org.apache.asterix.algebra.extension.IExtensionStatement;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.metadata.Broker;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class BrokerDropStatement implements IExtensionStatement {
-
-    private final Identifier dataverseName;
-    private final Identifier brokerName;
-    private boolean ifExists;
-
-    public BrokerDropStatement(Identifier dataverseName, Identifier brokerName, boolean ifExists) {
-        this.brokerName = brokerName;
-        this.dataverseName = dataverseName;
-        this.ifExists = ifExists;
-    }
-
-    public boolean getIfExists() {
-        return ifExists;
-    }
-
-    public Identifier getDataverseName() {
-        return dataverseName;
-    }
-
-    public Identifier getBrokerName() {
-        return brokerName;
-    }
-
-    @Override
-    public byte getKind() {
-        return Kind.EXTENSION;
-    }
-
-    @Override
-    public byte getCategory() {
-        return Category.DDL;
-    }
-
-    @Override
-    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
-        return null;
-    }
-
-    @Override
-    public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
-            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
-            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
-        //TODO: dont drop a broker that's being used
-        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
-        MetadataTransactionContext mdTxnCtx = null;
-        try {
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            Broker broker = BADLangExtension.getBroker(mdTxnCtx, dataverse, brokerName.getValue());
-            if (broker == null) {
-                throw new AlgebricksException("A broker with this name " + brokerName + " doesn't exist.");
-            }
-            MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, broker);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            QueryTranslator.abort(e, e, mdTxnCtx);
-            throw new HyracksDataException(e);
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
deleted file mode 100644
index 0faefa3..0000000
--- a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.lang.statement;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.message.ActiveManagerMessage;
-import org.apache.asterix.algebra.extension.IExtensionStatement;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobInfo;
-import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.metadata.Channel;
-import org.apache.asterix.bad.metadata.ChannelEventsListener;
-import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorNodePushable;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
-import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
-import org.apache.asterix.lang.common.statement.DropDatasetStatement;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.runtime.util.AsterixAppContextInfo;
-import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class ChannelDropStatement implements IExtensionStatement {
-
-    private final Identifier dataverseName;
-    private final Identifier channelName;
-    private boolean ifExists;
-
-    public ChannelDropStatement(Identifier dataverseName, Identifier channelName, boolean ifExists) {
-        this.dataverseName = dataverseName;
-        this.channelName = channelName;
-        this.ifExists = ifExists;
-    }
-
-    public Identifier getDataverseName() {
-        return dataverseName;
-    }
-
-    public Identifier getChannelName() {
-        return channelName;
-    }
-
-    public boolean getIfExists() {
-        return ifExists;
-    }
-
-    @Override
-    public byte getKind() {
-        return Kind.EXTENSION;
-    }
-
-    @Override
-    public byte getCategory() {
-        return Category.DDL;
-    }
-
-    @Override
-    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
-        return null;
-    }
-
-    @Override
-    public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
-            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
-            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
-
-        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
-        boolean txnActive = false;
-        EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
-        ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
-        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
-        boolean subscriberRegistered = false;
-        Channel channel = null;
-
-        MetadataTransactionContext mdTxnCtx = null;
-        try {
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            txnActive = true;
-            channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
-            txnActive = false;
-            if (channel == null) {
-                if (ifExists) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("There is no channel with this name " + channelName + ".");
-                }
-            }
-            if (listener != null) {
-                subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
-            }
-            if (!subscriberRegistered) {
-                throw new AsterixException("Channel " + channelName + " is not running");
-            }
-
-            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
-                    .getMessageBroker();
-
-            ChannelJobInfo cInfo = listener.getJobInfo(channel.getChannelId());;
-            Set<String> ncs = new HashSet<>(cInfo.getLocations());
-            AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(
-                    ncs.toArray(new String[ncs.size()]));
-            int partition = 0;
-            for (String location : locations.getLocations()) {
-                messageBroker.sendApplicationMessageToNC(
-                        new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, "cc",
-                                new ActiveRuntimeId(channel.getChannelId(),
-                                        RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition++)),
-                        location);
-            }
-            eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_ENDED);
-
-            //Drop the Channel Datasets
-            //TODO: Need to find some way to handle if this fails.
-            //TODO: Prevent datasets for Channels from being dropped elsewhere
-            DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse),
-                    new Identifier(channel.getResultsDatasetName()), true);
-            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
-
-            dropStmt = new DropDatasetStatement(new Identifier(dataverse),
-                    new Identifier(channel.getSubscriptionsDataset()), true);
-            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
-
-            if (subscriberRegistered) {
-                listener.deregisterEventSubscriber(eventSubscriber);
-            }
-
-            //Remove the Channel Metadata
-            MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (txnActive) {
-                QueryTranslator.abort(e, e, mdTxnCtx);
-            }
-            throw new HyracksDataException(e);
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
deleted file mode 100644
index 7d0cb1a..0000000
--- a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.lang.statement;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.algebra.extension.IExtensionStatement;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.metadata.Broker;
-import org.apache.asterix.bad.metadata.Channel;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.lang.common.base.Expression;
-import org.apache.asterix.lang.common.expression.CallExpr;
-import org.apache.asterix.lang.common.expression.FieldBinding;
-import org.apache.asterix.lang.common.expression.LiteralExpr;
-import org.apache.asterix.lang.common.expression.RecordConstructor;
-import org.apache.asterix.lang.common.literal.StringLiteral;
-import org.apache.asterix.lang.common.statement.InsertStatement;
-import org.apache.asterix.lang.common.statement.Query;
-import org.apache.asterix.lang.common.statement.UpsertStatement;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
-import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class ChannelSubscribeStatement implements IExtensionStatement {
-
-    private final Identifier dataverseName;
-    private final Identifier channelName;
-    private final Identifier brokerDataverseName;
-    private final Identifier brokerName;
-    private final List<Expression> argList;
-    private final String subscriptionId;
-    private final int varCounter;
-
-    public ChannelSubscribeStatement(Identifier dataverseName, Identifier channelName, List<Expression> argList,
-            int varCounter, Identifier brokerDataverseName, Identifier brokerName, String subscriptionId) {
-        this.channelName = channelName;
-        this.dataverseName = dataverseName;
-        this.brokerDataverseName = brokerDataverseName;
-        this.brokerName = brokerName;
-        this.argList = argList;
-        this.subscriptionId = subscriptionId;
-        this.varCounter = varCounter;
-    }
-
-    public Identifier getDataverseName() {
-        return dataverseName;
-    }
-
-    public Identifier getBrokerDataverseName() {
-        return brokerDataverseName;
-    }
-
-    public Identifier getChannelName() {
-        return channelName;
-    }
-
-    public Identifier getBrokerName() {
-        return brokerName;
-    }
-
-    public List<Expression> getArgList() {
-        return argList;
-    }
-
-    public int getVarCounter() {
-        return varCounter;
-    }
-
-    public String getSubscriptionId() {
-        return subscriptionId;
-    }
-
-    @Override
-    public byte getKind() {
-        return Kind.EXTENSION;
-    }
-
-    @Override
-    public byte getCategory() {
-        return Category.QUERY;
-    }
-
-    @Override
-    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
-        return null;
-    }
-
-    @Override
-    public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
-            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
-            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
-
-        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
-        String brokerDataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(brokerDataverseName);
-
-        MetadataTransactionContext mdTxnCtx = null;
-        try {
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-
-            Channel channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
-            if (channel == null) {
-                throw new AsterixException("There is no channel with this name " + channelName + ".");
-            }
-            Broker broker = BADLangExtension.getBroker(mdTxnCtx, brokerDataverse, brokerName.getValue());
-            if (broker == null) {
-                throw new AsterixException("There is no broker with this name " + brokerName + ".");
-            }
-
-            String subscriptionsDatasetName = channel.getSubscriptionsDataset();
-
-            if (argList.size() != channel.getFunction().getArity()) {
-                throw new AsterixException("Channel expected " + channel.getFunction().getArity()
-                        + " parameters but got " + argList.size());
-            }
-
-            Query subscriptionTuple = new Query(false);
-
-            List<FieldBinding> fb = new ArrayList<FieldBinding>();
-            LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.DataverseName));
-            Expression rightExpr = new LiteralExpr(new StringLiteral(brokerDataverse));
-            fb.add(new FieldBinding(leftExpr, rightExpr));
-
-            leftExpr = new LiteralExpr(new StringLiteral(BADConstants.BrokerName));
-            rightExpr = new LiteralExpr(new StringLiteral(broker.getBrokerName()));
-            fb.add(new FieldBinding(leftExpr, rightExpr));
-
-            if (subscriptionId != null) {
-                leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));
-
-                List<Expression> UUIDList = new ArrayList<Expression>();
-                UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
-                FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR;
-                FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
-                        function.getArity());
-                CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
-
-                rightExpr = UUIDCall;
-                fb.add(new FieldBinding(leftExpr, rightExpr));
-            }
-
-            for (int i = 0; i < argList.size(); i++) {
-                leftExpr = new LiteralExpr(new StringLiteral("param" + i));
-                rightExpr = argList.get(i);
-                fb.add(new FieldBinding(leftExpr, rightExpr));
-            }
-            RecordConstructor recordCon = new RecordConstructor(fb);
-            subscriptionTuple.setBody(recordCon);
-
-            subscriptionTuple.setVarCounter(varCounter);
-
-            if (subscriptionId == null) {
-                List<String> returnField = new ArrayList<>();
-                returnField.add(BADConstants.SubscriptionId);
-                metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
-                metadataProvider.setResultAsyncMode(
-                        resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
-                InsertStatement insert = new InsertStatement(new Identifier(dataverse),
-                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, false, returnField);
-                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, insert, hcc, hdc,
-                        resultDelivery, stats, false);
-            } else {
-                UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
-                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter);
-                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, upsert, hcc, hdc,
-                        resultDelivery, stats, false);
-            }
-
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            QueryTranslator.abort(e, e, mdTxnCtx);
-            throw new HyracksDataException(e);
-        }
-
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
deleted file mode 100644
index 50696b4..0000000
--- a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.lang.statement;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.algebra.extension.IExtensionStatement;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.metadata.Channel;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.lang.aql.visitor.AqlDeleteRewriteVisitor;
-import org.apache.asterix.lang.common.base.Expression;
-import org.apache.asterix.lang.common.expression.CallExpr;
-import org.apache.asterix.lang.common.expression.FieldAccessor;
-import org.apache.asterix.lang.common.expression.LiteralExpr;
-import org.apache.asterix.lang.common.expression.OperatorExpr;
-import org.apache.asterix.lang.common.expression.VariableExpr;
-import org.apache.asterix.lang.common.literal.StringLiteral;
-import org.apache.asterix.lang.common.statement.DeleteStatement;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
-import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class ChannelUnsubscribeStatement implements IExtensionStatement {
-
-    private final Identifier dataverseName;
-    private final Identifier channelName;
-    private final String subscriptionId;
-    private final int varCounter;
-    private VariableExpr vars;
-    private List<String> dataverses;
-    private List<String> datasets;
-
-    public ChannelUnsubscribeStatement(VariableExpr vars, Identifier dataverseName, Identifier channelName,
-            String subscriptionId, int varCounter, List<String> dataverses, List<String> datasets) {
-        this.vars = vars;
-        this.channelName = channelName;
-        this.dataverseName = dataverseName;
-        this.subscriptionId = subscriptionId;
-        this.varCounter = varCounter;
-        this.dataverses = dataverses;
-        this.datasets = datasets;
-    }
-
-    public Identifier getDataverseName() {
-        return dataverseName;
-    }
-
-    public VariableExpr getVariableExpr() {
-        return vars;
-    }
-
-    public Identifier getChannelName() {
-        return channelName;
-    }
-
-    public String getsubScriptionId() {
-        return subscriptionId;
-    }
-
-    public List<String> getDataverses() {
-        return dataverses;
-    }
-
-    public List<String> getDatasets() {
-        return datasets;
-    }
-
-    public int getVarCounter() {
-        return varCounter;
-    }
-
-    @Override
-    public byte getKind() {
-        return Kind.EXTENSION;
-    }
-
-    @Override
-    public byte getCategory() {
-        return Category.UPDATE;
-    }
-
-    @Override
-    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
-        return null;
-    }
-
-    @Override
-    public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
-            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
-            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
-        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
-
-        MetadataTransactionContext mdTxnCtx = null;
-        try {
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-
-            Channel channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
-            if (channel == null) {
-                throw new AsterixException("There is no channel with this name " + channelName + ".");
-            }
-
-            String subscriptionsDatasetName = channel.getSubscriptionsDataset();
-
-            //Need a condition to say subscription-id = sid
-            OperatorExpr condition = new OperatorExpr();
-            FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.SubscriptionId));
-            condition.addOperand(fa);
-            condition.setCurrentop(true);
-            condition.addOperator("=");
-
-            List<Expression> UUIDList = new ArrayList<Expression>();
-            UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
-
-            FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR;
-            FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
-                    function.getArity());
-            CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
-
-            condition.addOperand(UUIDCall);
-
-            DeleteStatement delete = new DeleteStatement(vars, new Identifier(dataverse),
-                    new Identifier(subscriptionsDatasetName), condition, varCounter, dataverses, datasets);
-            AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
-            delete.accept(visitor, null);
-
-            ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            QueryTranslator.abort(e, e, mdTxnCtx);
-            throw new HyracksDataException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
deleted file mode 100644
index a4d0eaf..0000000
--- a/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.lang.statement;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.algebra.extension.IExtensionStatement;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.metadata.Broker;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class CreateBrokerStatement implements IExtensionStatement {
-
-    private static final Logger LOGGER = Logger.getLogger(CreateBrokerStatement.class.getName());
-    private final Identifier dataverseName;
-    private final Identifier brokerName;
-    private String endPointName;
-
-    public CreateBrokerStatement(Identifier dataverseName, Identifier brokerName, String endPointName) {
-        this.brokerName = brokerName;
-        this.dataverseName = dataverseName;
-        this.endPointName = endPointName;
-    }
-
-    public String getEndPointName() {
-        return endPointName;
-    }
-
-    public Identifier getDataverseName() {
-        return dataverseName;
-    }
-
-    public Identifier getBrokerName() {
-        return brokerName;
-    }
-
-    @Override
-    public byte getKind() {
-        return Kind.EXTENSION;
-    }
-
-    @Override
-    public byte getCategory() {
-        return Category.DDL;
-    }
-
-    @Override
-    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
-        return null;
-    }
-
-    @Override
-    public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
-            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
-            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
-        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
-        MetadataTransactionContext mdTxnCtx = null;
-        try {
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            Broker broker = BADLangExtension.getBroker(mdTxnCtx, dataverse, brokerName.getValue());
-            if (broker != null) {
-                throw new AlgebricksException("A broker with this name " + brokerName + " already exists.");
-            }
-            broker = new Broker(dataverse, brokerName.getValue(), endPointName);
-            MetadataManager.INSTANCE.addEntity(mdTxnCtx, broker);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            if (mdTxnCtx != null) {
-                QueryTranslator.abort(e, e, mdTxnCtx);
-            }
-            LOGGER.log(Level.WARNING, "Failed creating a broker", e);
-            throw new HyracksDataException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
deleted file mode 100644
index 824e725..0000000
--- a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.lang.statement;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActivityState;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.algebra.extension.IExtensionStatement;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobInfo;
-import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.metadata.Channel;
-import org.apache.asterix.bad.metadata.ChannelEventsListener;
-import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
-import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
-import org.apache.asterix.file.JobSpecificationUtils;
-import org.apache.asterix.lang.aql.parser.AQLParserFactory;
-import org.apache.asterix.lang.common.base.Expression;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.expression.CallExpr;
-import org.apache.asterix.lang.common.expression.LiteralExpr;
-import org.apache.asterix.lang.common.literal.StringLiteral;
-import org.apache.asterix.lang.common.statement.DatasetDecl;
-import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
-import org.apache.asterix.lang.common.statement.InsertStatement;
-import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.entities.Function;
-import org.apache.asterix.om.base.temporal.ADurationParserFactory;
-import org.apache.asterix.runtime.util.AsterixAppContextInfo;
-import org.apache.asterix.runtime.util.AsterixClusterProperties;
-import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.util.JobUtils;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.application.ICCApplicationContext;
-import org.apache.hyracks.api.client.ClusterControllerInfo;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
-
-public class CreateChannelStatement implements IExtensionStatement {
-
-    private static final Logger LOGGER = Logger.getLogger(CreateChannelStatement.class.getName());
-
-    private final Identifier dataverseName;
-    private final Identifier channelName;
-    private final FunctionSignature function;
-    private final CallExpr period;
-    private String duration;
-    private InsertStatement channelResultsInsertQuery;
-    private String subscriptionsTableName;
-    private String resultsTableName;
-
-    public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
-            Expression period) {
-        this.channelName = channelName;
-        this.dataverseName = dataverseName;
-        this.function = function;
-        this.period = (CallExpr) period;
-        this.duration = "";
-    }
-
-    public Identifier getDataverseName() {
-        return dataverseName;
-    }
-
-    public Identifier getChannelName() {
-        return channelName;
-    }
-
-    public String getResultsName() {
-        return resultsTableName;
-    }
-
-    public String getSubscriptionsName() {
-        return subscriptionsTableName;
-    }
-
-    public String getDuration() {
-        return duration;
-    }
-
-    public FunctionSignature getFunction() {
-        return function;
-    }
-
-    public Expression getPeriod() {
-        return period;
-    }
-
-    public InsertStatement getChannelResultsInsertQuery() {
-        return channelResultsInsertQuery;
-    }
-
-    @Override
-    public byte getCategory() {
-        return Category.DDL;
-    }
-
-    @Override
-    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
-        return null;
-    }
-
-    public void initialize(MetadataTransactionContext mdTxnCtx, String subscriptionsTableName, String resultsTableName)
-            throws MetadataException, HyracksDataException {
-        Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx, function);
-        if (lookup == null) {
-            throw new MetadataException(" Unknown function " + function.getName());
-        }
-
-        if (!period.getFunctionSignature().getName().equals("duration")) {
-            throw new MetadataException(
-                    "Expected argument period as a duration, but got " + period.getFunctionSignature().getName() + ".");
-        }
-        duration = ((StringLiteral) ((LiteralExpr) period.getExprList().get(0)).getValue()).getValue();
-        IValueParser durationParser = ADurationParserFactory.INSTANCE.createValueParser();
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream outputStream = new DataOutputStream(bos);
-        durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
-        this.resultsTableName = resultsTableName;
-        this.subscriptionsTableName = subscriptionsTableName;
-
-    }
-
-    @Override
-    public byte getKind() {
-        return Kind.EXTENSION;
-    }
-
-    public Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildChannelJobSpec(String dataverse,
-            String channelName, String duration, AqlMetadataProvider metadataProvider, JobSpecification channeljobSpec,
-            String strIP, int port) throws Exception {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IOperatorDescriptor channelQueryExecuter;
-        AlgebricksPartitionConstraint executerPc;
-
-        Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> p = buildChannelRuntime(spec, dataverse,
-                channelName, duration, channeljobSpec, strIP, port);
-        channelQueryExecuter = p.first;
-        executerPc = p.second;
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, channelQueryExecuter, executerPc);
-        spec.addRoot(channelQueryExecuter);
-        return new Pair<>(spec, p.second);
-
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> buildChannelRuntime(
-            JobSpecification jobSpec, String dataverse, String channelName, String duration,
-            JobSpecification channeljobSpec, String strIP, int port) throws Exception {
-        RepetitiveChannelOperatorDescriptor channelOp = new RepetitiveChannelOperatorDescriptor(jobSpec, dataverse,
-                channelName, duration, channeljobSpec, strIP, port);
-
-        String partition = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations()[0];
-        Set<String> ncs = new HashSet<>(Arrays.asList(partition));
-        AlgebricksAbsolutePartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
-                ncs.toArray(new String[ncs.size()]));
-        return new Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint>(channelOp, partitionConstraint);
-    }
-
-    private void createDatasets(IStatementExecutor statementExecutor, Identifier subscriptionsName,
-            Identifier resultsName, AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
-            IHyracksDataset hdc, Stats stats, String dataverse) throws AsterixException, Exception {
-
-        Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
-        Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
-        //Setup the subscriptions dataset
-        List<List<String>> partitionFields = new ArrayList<List<String>>();
-        List<Integer> keyIndicators = new ArrayList<Integer>();
-        keyIndicators.add(0);
-        List<String> fieldNames = new ArrayList<String>();
-        fieldNames.add(BADConstants.SubscriptionId);
-        partitionFields.add(fieldNames);
-        IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
-        DatasetDecl createSubscriptionsDataset = new DatasetDecl(new Identifier(dataverse), subscriptionsName,
-                new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null, null,
-                new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
-
-        //Setup the results dataset
-        partitionFields = new ArrayList<List<String>>();
-        fieldNames = new ArrayList<String>();
-        fieldNames.add(BADConstants.ResultId);
-        partitionFields.add(fieldNames);
-        idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
-        DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName,
-                new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
-                new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
-
-        //Run both statements to create datasets
-        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
-                hcc);
-        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
-
-    }
-
-    private JobSpecification createChannelJob(IStatementExecutor statementExecutor, Identifier subscriptionsName,
-            Identifier resultsName, AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
-            IHyracksDataset hdc, Stats stats, String dataverse) throws Exception {
-        StringBuilder builder = new StringBuilder();
-        builder.append("insert into dataset " + dataverse + "." + resultsName + " ");
-        builder.append(" (" + " let $" + BADConstants.ChannelExecutionTime + " := current-datetime() \n");
-
-        builder.append("for $sub in dataset " + dataverse + "." + subscriptionsName + "\n");
-        builder.append(
-                "for $broker in dataset " + BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + "\n");
-        builder.append("where $broker." + BADConstants.BrokerName + "= $sub." + BADConstants.BrokerName + "\n");
-        builder.append("and $broker." + BADConstants.DataverseName + "= $sub." + BADConstants.DataverseName + "\n");
-        builder.append(" for $result in " + function.getNamespace() + "." + function.getName() + "(");
-        int i = 0;
-        for (; i < function.getArity() - 1; i++) {
-            builder.append("$sub.param" + i + ",");
-        }
-        builder.append("$sub.param" + i + ")\n");
-        builder.append("return {\n");
-        builder.append("\"" + BADConstants.ChannelExecutionTime + "\":$" + BADConstants.ChannelExecutionTime + ",");
-        builder.append("\"" + BADConstants.SubscriptionId + "\":$sub." + BADConstants.SubscriptionId + ",");
-        builder.append("\"" + BADConstants.DeliveryTime + "\":current-datetime(),");
-        builder.append("\"result\":$result");
-        builder.append("}");
-        builder.append(")");
-        builder.append(" return records");
-        builder.append(";");
-        AQLParserFactory aqlFact = new AQLParserFactory();
-        List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
-        return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0),
-                hcc, hdc, ResultDelivery.ASYNC, stats, true);
-    }
-
-    @Override
-    public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
-            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
-            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
-
-        //This function performs three tasks:
-        //1. Create datasets for the Channel
-        //2. Create the compiled Channel Job
-        //3. Create the metadata entry for the channel
-
-        //TODO: Figure out how to handle when a subset of the 3 tasks fails
-        //TODO: The compiled job will break if anything changes to the function or two datasets
-        // Need to make sure we do proper checking when altering these things
-
-        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
-
-        Identifier subscriptionsName = new Identifier(channelName + BADConstants.subscriptionEnding);
-        Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
-        EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
-        ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
-        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
-        boolean subscriberRegistered = false;
-        Channel channel = null;
-
-        MetadataTransactionContext mdTxnCtx = null;
-        try {
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
-            if (channel != null) {
-                throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
-            }
-            if (listener != null) {
-                subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
-            }
-            if (subscriberRegistered) {
-                throw new AsterixException("Channel " + channelName + " is already running");
-            }
-            initialize(mdTxnCtx, subscriptionsName.getValue(), resultsName.getValue());
-            channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
-                    duration);
-
-            //check if names are available before creating anything
-            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue()) != null) {
-                throw new AsterixException("The channel name:" + channelName + " is not available.");
-            }
-            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()) != null) {
-                throw new AsterixException("The channel name:" + channelName + " is not available.");
-            }
-
-            // Now we subscribe
-            if (listener == null) {
-                listener = new ChannelEventsListener(entityId);
-                ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
-            }
-            listener.registerEventSubscriber(eventSubscriber);
-            subscriberRegistered = true;
-
-            //Create Channel Datasets
-            createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider, hcc, hdc, stats,
-                    dataverse);
-
-            //Create Channel Internal Job
-            JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
-                    metadataProvider, hcc, hdc, stats, dataverse);
-
-            //Create Channel Operator
-            ICCApplicationContext iCCApp = AsterixAppContextInfo.INSTANCE.getCCApplicationContext();
-            ClusterControllerInfo ccInfo = iCCApp.getCCContext().getClusterControllerInfo();
-            String strIP = ccInfo.getClientNetAddress();
-            int port = ccInfo.getClientNetPort();
-            Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> alteredJobSpec = buildChannelJobSpec(
-                    dataverse, channelName.getValue(), duration, metadataProvider, channeljobSpec, strIP, port);
-
-            ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null, ActivityState.ACTIVE,
-                    alteredJobSpec.first);
-            alteredJobSpec.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, channelJobInfo);
-            JobUtils.runJob(hcc, alteredJobSpec.first, false);
-
-            eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
-
-            MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            if (mdTxnCtx != null) {
-                QueryTranslator.abort(e, e, mdTxnCtx);
-            }
-            LOGGER.log(Level.WARNING, "Failed creating a channel", e);
-            throw new HyracksDataException(e);
-        }
-
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
deleted file mode 100644
index 05ab4c6..0000000
--- a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.metadata;
-
-import java.rmi.RemoteException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.common.api.ExtensionId;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
-import org.apache.asterix.metadata.api.IMetadataEntity;
-import org.apache.asterix.metadata.api.IMetadataExtension;
-import org.apache.asterix.metadata.api.IMetadataIndex;
-import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.metadata.entities.Datatype;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider;
-import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class BADMetadataExtension implements IMetadataExtension {
-
-    public static final ExtensionId BAD_METADATA_EXTENSION_ID = new ExtensionId(
-            BADConstants.BAD_METADATA_EXTENSION_NAME, 0);
-    public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
-            NonTaggedDataFormat.class.getName(), IMetadataEntity.PENDING_NO_OP);
-
-    public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
-            BADConstants.ChannelSubscriptionsType, BADMetadataRecordTypes.channelSubscriptionsType, false);
-    public static final Datatype BAD_RESULT_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
-            BADConstants.ChannelResultsType, BADMetadataRecordTypes.channelResultsType, false);
-
-    public static final Datatype BAD_BROKER_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
-            BADConstants.RECORD_TYPENAME_BROKER, BADMetadataRecordTypes.BROKER_RECORDTYPE, false);
-
-    public static final Datatype BAD_CHANNEL_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
-            BADConstants.RECORD_TYPENAME_CHANNEL, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, false);
-
-    @Override
-    public ExtensionId getId() {
-        return BAD_METADATA_EXTENSION_ID;
-    }
-
-    @Override
-    public void configure(List<Pair<String, String>> args) {
-        // do nothing??
-    }
-
-    @Override
-    public MetadataTupleTranslatorProvider getMetadataTupleTranslatorProvider() {
-        return new MetadataTupleTranslatorProvider();
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public List<ExtensionMetadataDataset> getExtensionIndexes() {
-        try {
-            return Arrays.asList(BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET);
-        } catch (Throwable th) {
-            th.printStackTrace();
-            throw th;
-        }
-    }
-
-    @Override
-    public void initializeMetadata() throws HyracksDataException, RemoteException, ACIDException {
-        // enlist datasets
-        MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.CHANNEL_DATASET);
-        MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.BROKER_DATASET);
-        if (MetadataBootstrap.isNewUniverse()) {
-            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            try {
-                // add metadata datasets
-                MetadataBootstrap.insertMetadataDatasets(mdTxnCtx,
-                        new IMetadataIndex[] { BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET });
-                // insert default dataverse
-                // TODO prevent user from dropping this dataverse
-                // MetadataManager.INSTANCE.addDataverse(mdTxnCtx, BAD_DATAVERSE);
-                // insert default data type
-                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_RESULT_DATATYPE);
-                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_SUBSCRIPTION_DATATYPE);
-                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_DATATYPE);
-                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_DATATYPE);
-                // TODO prevent user from dropping these types
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            } catch (Exception e) {
-                e.printStackTrace();
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-            }
-        }
-        // local recovery?
-        // nothing for now
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
deleted file mode 100644
index 848fe78..0000000
--- a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.metadata;
-
-import java.util.Arrays;
-
-import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
-import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
-import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
-import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-
-public class BADMetadataIndexes {
-
-    public static final ExtensionMetadataDatasetId BAD_CHANNEL_INDEX_ID = new ExtensionMetadataDatasetId(
-            BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.CHANNEL_EXTENSION_NAME);
-    public static final MetadataIndexImmutableProperties PROPERTIES_CHANNEL = new MetadataIndexImmutableProperties(
-            BADConstants.CHANNEL_EXTENSION_NAME,
-            MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID,
-            MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID);
-
-    public static final ExtensionMetadataDatasetId BAD_BROKER_INDEX_ID = new ExtensionMetadataDatasetId(
-            BADMetadataExtension.BAD_METADATA_EXTENSION_ID, BADConstants.BROKER_KEYWORD);
-    public static final MetadataIndexImmutableProperties PROPERTIES_BROKER = new MetadataIndexImmutableProperties(
-            BADConstants.BROKER_KEYWORD,
-            MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 1,
-            MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID + 1);
-
-    public static final int NUM_FIELDS_CHANNEL_IDX = 3;
-    public static final int NUM_FIELDS_BROKER_IDX = 3;
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static final ExtensionMetadataDataset CHANNEL_DATASET = new ExtensionMetadataDataset(PROPERTIES_CHANNEL,
-            NUM_FIELDS_CHANNEL_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
-            Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
-                    Arrays.asList(BADConstants.ChannelName)),
-            0, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, true, new int[] { 0, 1 }, BAD_CHANNEL_INDEX_ID,
-            new ChannelTupleTranslator(true));
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static final ExtensionMetadataDataset BROKER_DATASET = new ExtensionMetadataDataset(PROPERTIES_BROKER,
-            NUM_FIELDS_BROKER_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
-            Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
-                    Arrays.asList(BADConstants.BrokerName)),
-            0, BADMetadataRecordTypes.BROKER_RECORDTYPE, true, new int[] { 0, 1 }, BAD_BROKER_INDEX_ID,
-            new BrokerTupleTranslator(true));
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
deleted file mode 100644
index cec98d0..0000000
--- a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.metadata;
-
-import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-
-public class BADMetadataRecordTypes {
-
-    // -------------------------------------- Subscriptions --------------------------------------//
-    private static final String[] subTypeFieldNames = { BADConstants.DataverseName, BADConstants.BrokerName,
-            BADConstants.SubscriptionId };
-    private static final IAType[] subTypeFieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AUUID };
-    public static final ARecordType channelSubscriptionsType = new ARecordType(BADConstants.ChannelSubscriptionsType,
-            subTypeFieldNames, subTypeFieldTypes, true);
-
-    // ---------------------------------------- Results --------------------------------------------//
-    private static final String[] resultTypeFieldNames = { BADConstants.ResultId, BADConstants.ChannelExecutionTime,
-            BADConstants.SubscriptionId, BADConstants.DeliveryTime };
-    private static final IAType[] resultTypeFieldTypes = { BuiltinType.AUUID, BuiltinType.ADATETIME, BuiltinType.AUUID,
-            BuiltinType.ADATETIME };
-    public static final ARecordType channelResultsType = new ARecordType(BADConstants.ChannelResultsType,
-            resultTypeFieldNames, resultTypeFieldTypes, true);
-
-    //------------------------------------------ Channel ----------------------------------------//     
-    public static final int CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
-    public static final int CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX = 1;
-    public static final int CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
-    public static final int CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX = 3;
-    public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
-    public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
-    public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
-            // RecordTypeName
-            BADConstants.RECORD_TYPENAME_CHANNEL,
-            // FieldNames
-            new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
-                    BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration },
-            // FieldTypes
-            new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-                    BuiltinType.ASTRING, BuiltinType.ASTRING },
-            //IsOpen?
-            true);
-    //------------------------------------------ Broker ----------------------------------------//
-    public static final int BROKER_DATAVERSE_NAME_FIELD_INDEX = 0;
-    public static final int BROKER_NAME_FIELD_INDEX = 1;
-    public static final int BROKER_ENDPOINT_FIELD_INDEX = 2;
-    public static final ARecordType BROKER_RECORDTYPE = MetadataRecordTypes.createRecordType(
-            // RecordTypeName
-            BADConstants.RECORD_TYPENAME_BROKER,
-            // FieldNames
-            new String[] { BADConstants.DataverseName, BADConstants.BrokerName, BADConstants.BrokerEndPoint },
-            // FieldTypes
-            new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-                    BuiltinType.ASTRING, BuiltinType.ASTRING },
-            //IsOpen?
-            true);
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/metadata/Broker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/Broker.java b/src/main/java/org/apache/asterix/bad/metadata/Broker.java
deleted file mode 100644
index 006f0dc..0000000
--- a/src/main/java/org/apache/asterix/bad/metadata/Broker.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2009-2015 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.asterix.bad.metadata;
-
-import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
-import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
-
-/**
- * Metadata describing a broker.
- */
-public class Broker implements IExtensionMetadataEntity {
-
-    private static final long serialVersionUID = 1L;
-
-    private final String dataverseName;
-    private final String brokerName;
-    private final String endPointName;
-
-    public Broker(String dataverseName, String brokerName, String endPointName) {
-        this.endPointName = endPointName;
-        this.dataverseName = dataverseName;
-        this.brokerName = brokerName;
-    }
-
-    public String getDataverseName() {
-        return dataverseName;
-    }
-
-    public String getBrokerName() {
-        return brokerName;
-    }
-
-    public String getEndPointName() {
-        return endPointName;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (this == other) {
-            return true;
-        }
-        if (!(other instanceof Broker)) {
-            return false;
-        }
-        Broker otherDataset = (Broker) other;
-        if (!otherDataset.brokerName.equals(brokerName)) {
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public ExtensionMetadataDatasetId getDatasetId() {
-        return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java b/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
deleted file mode 100644
index b73e9e3..0000000
--- a/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.metadata;
-
-import org.apache.asterix.metadata.MetadataNode;
-import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
-import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-public class BrokerSearchKey implements IExtensionMetadataSearchKey {
-    private static final long serialVersionUID = 1L;
-    private final String dataverse;
-    private final String broker;
-
-    public BrokerSearchKey(String dataverse, String broker) {
-        this.dataverse = dataverse;
-        this.broker = broker;
-    }
-
-    @Override
-    public ExtensionMetadataDatasetId getDatasetId() {
-        return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
-    }
-
-    @Override
-    public ITupleReference getSearchKey() {
-        return MetadataNode.createTuple(dataverse, broker);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java b/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
deleted file mode 100644
index 0a37c02..0000000
--- a/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.asterix.bad.metadata;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
-import org.apache.asterix.om.base.ARecord;
-import org.apache.asterix.om.base.AString;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-/**
- * Translates a Channel metadata entity to an ITupleReference and vice versa.
- */
-public class BrokerTupleTranslator extends AbstractTupleTranslator<Broker> {
-    // Field indexes of serialized Broker in a tuple.
-    // Key field.
-    public static final int BROKER_DATAVERSE_NAME_FIELD_INDEX = 0;
-
-    public static final int BROKER_NAME_FIELD_INDEX = 1;
-
-    // Payload field containing serialized broker.
-    public static final int BROKER_PAYLOAD_TUPLE_FIELD_INDEX = 2;
-
-    @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BADMetadataRecordTypes.BROKER_RECORDTYPE);
-
-    @SuppressWarnings("unchecked")
-    public BrokerTupleTranslator(boolean getTuple) {
-        super(getTuple, BADMetadataIndexes.NUM_FIELDS_BROKER_IDX);
-    }
-
-    @Override
-    public Broker getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
-        byte[] serRecord = frameTuple.getFieldData(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
-        int recordStartOffset = frameTuple.getFieldStart(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
-        int recordLength = frameTuple.getFieldLength(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
-        ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
-        DataInput in = new DataInputStream(stream);
-        ARecord channelRecord = recordSerDes.deserialize(in);
-        return createBrokerFromARecord(channelRecord);
-    }
-
-    private Broker createBrokerFromARecord(ARecord brokerRecord) {
-        Broker broker = null;
-        String dataverseName = ((AString) brokerRecord
-                .getValueByPos(BADMetadataRecordTypes.BROKER_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
-        String brokerName = ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_NAME_FIELD_INDEX))
-                .getStringValue();
-        String endPointName = ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX))
-                .getStringValue();
-
-        broker = new Broker(dataverseName, brokerName, endPointName);
-        return broker;
-    }
-
-    @Override
-    public ITupleReference getTupleFromMetadataEntity(Broker broker) throws IOException, MetadataException {
-        // write the key in the first fields of the tuple
-
-        tupleBuilder.reset();
-        aString.setValue(broker.getDataverseName());
-        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
-        tupleBuilder.addFieldEndOffset();
-
-        aString.setValue(broker.getBrokerName());
-        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
-        tupleBuilder.addFieldEndOffset();
-
-        recordBuilder.reset(BADMetadataRecordTypes.BROKER_RECORDTYPE);
-
-        // write field 0
-        fieldValue.reset();
-        aString.setValue(broker.getDataverseName());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(BADMetadataRecordTypes.BROKER_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
-
-        // write field 1
-        fieldValue.reset();
-        aString.setValue(broker.getBrokerName());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(BADMetadataRecordTypes.BROKER_NAME_FIELD_INDEX, fieldValue);
-
-        // write field 2
-        fieldValue.reset();
-        aString.setValue(broker.getEndPointName());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX, fieldValue);
-
-        // write record
-        recordBuilder.write(tupleBuilder.getDataOutput(), true);
-
-        tupleBuilder.addFieldEndOffset();
-
-        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
-        return tuple;
-    }
-}
\ No newline at end of file


Mime
View raw message