asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjaco...@apache.org
Subject [7/7] asterixdb-bad git commit: Updated to match code changes to asterix
Date Wed, 07 Dec 2016 20:59:45 GMT
Updated to match code changes to asterix

Added Procedure Langauge and Metadata
Restructured to fit with bom pom
Added ChannelJobService for execution tasks
Added string constants file
Added BAD Rewrite Rule Set

Change-Id: I010b81776543e127f09f046a8601bb7184f7de9a


Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/d0ec8377
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/d0ec8377
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/d0ec8377

Branch: refs/heads/master
Commit: d0ec837761d1e679ee0d8ac57adec12c0d9d4e8b
Parents: f73ca84
Author: Steven Glenn Jacobs <sjaco002@ucr.edu>
Authored: Wed Dec 7 11:00:08 2016 -0800
Committer: Steven Glenn Jacobs <sjaco002@ucr.edu>
Committed: Wed Dec 7 11:00:08 2016 -0800

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 asterix-bad/pom.xml                             | 218 ++++++++++
 .../org/apache/asterix/bad/BADConstants.java    |  56 +++
 .../org/apache/asterix/bad/ChannelJobInfo.java  |  48 +++
 .../apache/asterix/bad/ChannelJobService.java   | 194 +++++++++
 .../bad/lang/BADCompilationProvider.java        |  58 +++
 .../asterix/bad/lang/BADLangExtension.java      | 122 ++++++
 .../asterix/bad/lang/BADParserFactory.java      |  38 ++
 .../bad/lang/BADQueryTranslatorExtension.java   |  51 +++
 .../bad/lang/BADQueryTranslatorFactory.java     |  36 ++
 .../asterix/bad/lang/BADRuleSetFactory.java     |  63 +++
 .../asterix/bad/lang/BADStatementExecutor.java  |  69 ++++
 .../bad/lang/statement/BrokerDropStatement.java | 100 +++++
 .../lang/statement/ChannelDropStatement.java    | 175 +++++++++
 .../statement/ChannelSubscribeStatement.java    | 230 +++++++++++
 .../statement/ChannelUnsubscribeStatement.java  | 165 ++++++++
 .../lang/statement/CreateBrokerStatement.java   | 106 +++++
 .../lang/statement/CreateChannelStatement.java  | 393 +++++++++++++++++++
 .../statement/CreateProcedureStatement.java     | 185 +++++++++
 .../bad/metadata/BADMetadataExtension.java      | 121 ++++++
 .../bad/metadata/BADMetadataIndexes.java        |  82 ++++
 .../bad/metadata/BADMetadataRecordTypes.java    | 100 +++++
 .../org/apache/asterix/bad/metadata/Broker.java |  69 ++++
 .../asterix/bad/metadata/BrokerSearchKey.java   |  45 +++
 .../bad/metadata/BrokerTupleTranslator.java     | 119 ++++++
 .../apache/asterix/bad/metadata/Channel.java    |  86 ++++
 .../bad/metadata/ChannelEventsListener.java     | 229 +++++++++++
 .../asterix/bad/metadata/ChannelSearchKey.java  |  45 +++
 .../bad/metadata/ChannelTupleTranslator.java    | 159 ++++++++
 .../bad/metadata/DataverseBrokersSearchKey.java |  43 ++
 .../metadata/DataverseChannelsSearchKey.java    |  43 ++
 .../apache/asterix/bad/metadata/Procedure.java  |  96 +++++
 .../bad/metadata/ProcedureSearchKey.java        |  47 +++
 .../bad/metadata/ProcedureTupleTranslator.java  | 189 +++++++++
 .../InsertBrokerNotifierForChannelRule.java     | 323 +++++++++++++++
 .../bad/runtime/NotifyBrokerOperator.java       |  90 +++++
 .../bad/runtime/NotifyBrokerPOperator.java      | 111 ++++++
 .../bad/runtime/NotifyBrokerRuntime.java        | 139 +++++++
 .../bad/runtime/NotifyBrokerRuntimeFactory.java |  55 +++
 .../RepetitiveChannelOperatorDescriptor.java    |  78 ++++
 .../RepetitiveChannelOperatorNodePushable.java  |  76 ++++
 .../src/main/resources/lang-extension/lang.txt  | 206 ++++++++++
 .../asterix/bad/test/BADExecutionTest.java      |  98 +++++
 .../asterix/bad/test/BADOptimizerTest.java      |  60 +++
 .../conf/asterix-build-configuration.xml        | 110 ++++++
 asterix-bad/src/test/resources/conf/cluster.xml |  49 +++
 .../conf/hyracks-deployment.properties          |  21 +
 .../src/test/resources/conf/test.properties     |  22 ++
 .../queries/channel/channel-create.aql          |  36 ++
 .../queries/channel/channel-subscribe.aql       |  40 ++
 .../queries/channel/channel-unsubscribe.aql     |  38 ++
 .../results/channel/channel-create.plan         |  57 +++
 .../results/channel/channel-subscribe.plan      |  71 ++++
 .../results/channel/channel-unsubscribe.plan    |  71 ++++
 .../create_channel_check_datasets.1.ddl.aql     |  34 ++
 .../create_channel_check_datasets.3.query.aql   |   7 +
 .../create_channel_check_metadata.1.ddl.aql     |  34 ++
 .../create_channel_check_metadata.3.query.aql   |   3 +
 .../drop_channel_check_datasets.1.ddl.aql       |  38 ++
 .../drop_channel_check_datasets.2.ddl.aql       |   3 +
 .../drop_channel_check_datasets.3.query.aql     |   7 +
 .../drop_channel_check_metadata.1.ddl.aql       |  38 ++
 .../drop_channel_check_metadata.2.ddl.aql       |   3 +
 .../drop_channel_check_metadata.3.query.aql     |   5 +
 .../room_occupants/room_occupants.1.ddl.aql     |  56 +++
 .../room_occupants/room_occupants.2.update.aql  |  12 +
 .../room_occupants/room_occupants.3.query.aql   |  15 +
 .../room_occupants/room_occupants.4.update.aql  |  16 +
 .../room_occupants/room_occupants.5.sleep.aql   |   7 +
 .../room_occupants/room_occupants.6.update.aql  |  14 +
 .../room_occupants/room_occupants.7.query.aql   |  12 +
 ...scribe_channel_check_subscriptions.1.ddl.aql |  36 ++
 ...ibe_channel_check_subscriptions.2.update.aql |   3 +
 ...ibe_channel_check_subscriptions.3.update.aql |   3 +
 ...ibe_channel_check_subscriptions.4.update.aql |   3 +
 ...ribe_channel_check_subscriptions.5.query.aql |   5 +
 .../create_channel_check_datasets.1.adm         |   2 +
 .../create_channel_check_metadata.1.adm         |   1 +
 .../drop_channel_check_datasets.1.adm           |   4 +
 .../drop_channel_check_metadata.1.adm           |   2 +
 .../channel/room_occupants/room_occupants.3.adm |   2 +
 .../channel/room_occupants/room_occupants.7.adm |   2 +
 .../subscribe_channel_check_subscriptions.1.adm |   3 +
 .../src/test/resources/runtimets/testsuite.xml  |  37 ++
 asterix-opt-bom/pom.xml                         |  47 +++
 pom.xml                                         | 207 +---------
 .../org/apache/asterix/bad/BADConstants.java    |  48 ---
 .../org/apache/asterix/bad/ChannelJobInfo.java  |  48 ---
 .../apache/asterix/bad/ChannelJobService.java   | 145 -------
 .../bad/lang/BADCompilationProvider.java        |  52 ---
 .../asterix/bad/lang/BADLangExtension.java      | 106 -----
 .../asterix/bad/lang/BADParserFactory.java      |  38 --
 .../bad/lang/BADQueryTranslatorExtension.java   |  53 ---
 .../bad/lang/BADQueryTranslatorFactory.java     |  41 --
 .../asterix/bad/lang/BADStatementExecutor.java  |  45 ---
 .../bad/lang/statement/BrokerDropStatement.java | 100 -----
 .../lang/statement/ChannelDropStatement.java    | 175 ---------
 .../statement/ChannelSubscribeStatement.java    | 209 ----------
 .../statement/ChannelUnsubscribeStatement.java  | 165 --------
 .../lang/statement/CreateBrokerStatement.java   | 106 -----
 .../lang/statement/CreateChannelStatement.java  | 371 -----------------
 .../bad/metadata/BADMetadataExtension.java      | 115 ------
 .../bad/metadata/BADMetadataIndexes.java        |  66 ----
 .../bad/metadata/BADMetadataRecordTypes.java    |  77 ----
 .../org/apache/asterix/bad/metadata/Broker.java |  69 ----
 .../asterix/bad/metadata/BrokerSearchKey.java   |  45 ---
 .../bad/metadata/BrokerTupleTranslator.java     | 118 ------
 .../apache/asterix/bad/metadata/Channel.java    |  86 ----
 .../bad/metadata/ChannelEventsListener.java     | 231 -----------
 .../asterix/bad/metadata/ChannelSearchKey.java  |  45 ---
 .../bad/metadata/ChannelTupleTranslator.java    | 159 --------
 .../InsertBrokerNotifierForChannelRule.java     | 317 ---------------
 .../bad/runtime/NotifyBrokerOperator.java       |  90 -----
 .../bad/runtime/NotifyBrokerPOperator.java      | 111 ------
 .../bad/runtime/NotifyBrokerRuntime.java        | 138 -------
 .../bad/runtime/NotifyBrokerRuntimeFactory.java |  55 ---
 .../RepetitiveChannelOperatorDescriptor.java    |  83 ----
 .../RepetitiveChannelOperatorNodePushable.java  | 125 ------
 src/main/resources/lang-extension/lang.txt      | 178 ---------
 .../asterix/bad/test/BADExecutionTest.java      |  98 -----
 .../asterix/bad/test/BADOptimizerTest.java      |  55 ---
 .../conf/asterix-build-configuration.xml        | 110 ------
 src/test/resources/conf/cluster.xml             |  49 ---
 .../conf/hyracks-deployment.properties          |  21 -
 src/test/resources/conf/test.properties         |  22 --
 .../queries/channel/channel-create.aql          |  36 --
 .../queries/channel/channel-subscribe.aql       |  40 --
 .../queries/channel/channel-unsubscribe.aql     |  38 --
 .../results/channel/channel-create.plan         |  30 --
 .../results/channel/channel-subscribe.plan      |  44 ---
 .../results/channel/channel-unsubscribe.plan    |  44 ---
 .../create_channel_check_datasets.1.ddl.aql     |  34 --
 .../create_channel_check_datasets.3.query.aql   |   7 -
 .../create_channel_check_metadata.1.ddl.aql     |  34 --
 .../create_channel_check_metadata.3.query.aql   |   3 -
 .../drop_channel_check_datasets.1.ddl.aql       |  38 --
 .../drop_channel_check_datasets.2.ddl.aql       |   3 -
 .../drop_channel_check_datasets.3.query.aql     |   7 -
 .../drop_channel_check_metadata.1.ddl.aql       |  38 --
 .../drop_channel_check_metadata.2.ddl.aql       |   3 -
 .../drop_channel_check_metadata.3.query.aql     |   3 -
 .../room_occupants/room_occupants.1.ddl.aql     |  56 ---
 .../room_occupants/room_occupants.2.update.aql  |  12 -
 .../room_occupants/room_occupants.3.query.aql   |  15 -
 .../room_occupants/room_occupants.4.update.aql  |  16 -
 .../room_occupants/room_occupants.5.sleep.aql   |   7 -
 .../room_occupants/room_occupants.6.update.aql  |  14 -
 .../room_occupants/room_occupants.7.query.aql   |  12 -
 ...scribe_channel_check_subscriptions.1.ddl.aql |  34 --
 ...ibe_channel_check_subscriptions.2.update.aql |   7 -
 ...ribe_channel_check_subscriptions.3.query.aql |   5 -
 .../create_channel_check_datasets.1.adm         |   2 -
 .../create_channel_check_metadata.1.adm         |   1 -
 .../drop_channel_check_datasets.1.adm           |   4 -
 .../drop_channel_check_metadata.1.adm           |   2 -
 .../channel/room_occupants/room_occupants.3.adm |   2 -
 .../channel/room_occupants/room_occupants.7.adm |   2 -
 .../subscribe_channel_check_subscriptions.1.adm |   3 -
 src/test/resources/runtimets/testsuite.xml      |  37 --
 159 files changed, 6001 insertions(+), 4990 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index a4ee8e4..42fc47d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,4 @@ target
 git.properties
 .DS_Store
 *.swp
+build

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
new file mode 100644
index 0000000..1a26d5a
--- /dev/null
+++ b/asterix-bad/pom.xml
@@ -0,0 +1,218 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.    See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.asterix.bad</groupId>
+    <artifactId>asterix-opt</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>asterix-bad</artifactId>
+    <properties>
+    <asterix.version>0.8.9-SNAPSHOT</asterix.version>
+  </properties>
+    <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.asterix</groupId>
+        <artifactId>asterix-grammar-extension-maven-plugin</artifactId>
+        <version>${asterix.version}</version>
+        <configuration>
+          <base>${project.basedir}</base>
+          <gbase>../../asterix-lang-aql/src/main/javacc/AQL.jj</gbase>
+          <gextension>src/main/resources/lang-extension/lang.txt</gextension>
+          <output>target/generated-resources/javacc/grammar.jj</output>
+          <parserClassName>BADAQLParser</parserClassName>
+          <packageName>org.apache.asterix.bad.lang</packageName>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>grammarix</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>javacc-maven-plugin</artifactId>
+        <version>2.6</version>
+        <executions>
+          <execution>
+            <id>javacc</id>
+            <goals>
+              <goal>javacc</goal>
+            </goals>
+            <configuration>
+              <isStatic>false</isStatic>
+              <javaUnicodeEscape>true</javaUnicodeEscape>
+              <sourceDirectory>target/generated-resources/javacc</sourceDirectory>
+            </configuration>
+          </execution>
+          <execution>
+            <id>javacc-jjdoc</id>
+            <goals>
+              <goal>jjdoc</goal>
+            </goals>
+            <phase>process-sources</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.9</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${project.build.directory}/generated-sources/javacc/</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+    <pluginManagement>
+      <plugins>
+            <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+        <plugin>
+          <groupId>org.eclipse.m2e</groupId>
+          <artifactId>lifecycle-mapping</artifactId>
+          <version>1.0.0</version>
+          <configuration>
+            <lifecycleMappingMetadata>
+              <pluginExecutions>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.asterix</groupId>
+                    <artifactId>asterix-grammar-extension-maven-plugin</artifactId>
+                    <versionRange>[${asterix.version},)</versionRange>
+                    <goals>
+                      <goal>grammarix</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.codehaus.mojo</groupId>
+                    <artifactId>javacc-maven-plugin</artifactId>
+                    <versionRange>[2.6,)</versionRange>
+                    <goals>
+                      <goal>javacc</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+              </pluginExecutions>
+            </lifecycleMappingMetadata>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-om</artifactId>
+      <version>${asterix.version}</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-test-support</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-runtime</artifactId>
+      <version>${asterix.version}</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>algebricks-compiler</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-hdfs-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-common</artifactId>
+      <version>${asterix.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-test-framework</artifactId>
+      <version>${asterix.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-active</artifactId>
+      <version>${asterix.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-algebra</artifactId>
+      <version>${asterix.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-app</artifactId>
+      <version>${asterix.version}</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-app</artifactId>
+      <version>${asterix.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-common</artifactId>
+      <version>${asterix.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <version>${hadoop.version}</version>
+      <type>jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
new file mode 100644
index 0000000..a906ae6
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+public interface BADConstants {
+    public static final String SubscriptionId = "subscriptionId";
+    public static final String BrokerName = "BrokerName";
+    public static final String ChannelName = "ChannelName";
+    public static final String ProcedureName = "ProcedureName";
+    public static final String DataverseName = "DataverseName";
+    public static final String BrokerEndPoint = "BrokerEndPoint";
+    public static final String DeliveryTime = "deliveryTime";
+    public static final String ResultId = "resultId";
+    public static final String ChannelExecutionTime = "channelExecutionTime";
+    public static final String ChannelSubscriptionsType = "ChannelSubscriptionsType";
+    public static final String ChannelResultsType = "ChannelResultsType";
+    public static final String ResultsDatasetName = "ResultsDatasetName";
+    public static final String SubscriptionsDatasetName = "SubscriptionsDatasetName";
+    public static final String CHANNEL_EXTENSION_NAME = "Channel";
+    public static final String PROCEDURE_KEYWORD = "Procedure";
+    public static final String BROKER_KEYWORD = "Broker";
+    public static final String RECORD_TYPENAME_BROKER = "BrokerRecordType";
+    public static final String RECORD_TYPENAME_CHANNEL = "ChannelRecordType";
+    public static final String RECORD_TYPENAME_PROCEDURE = "ProcedureRecordType";
+    public static final String subscriptionEnding = "Subscriptions";
+    public static final String resultsEnding = "Results";
+    public static final String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension";
+    public static final String BAD_DATAVERSE_NAME = "Metadata";
+    public static final String Duration = "Duration";
+    public static final String Function = "Function";
+    public static final String FIELD_NAME_ARITY = "Arity";
+    public static final String FIELD_NAME_PARAMS = "Params";
+    public static final String FIELD_NAME_RETURN_TYPE = "ReturnType";
+    public static final String FIELD_NAME_DEFINITION = "Definition";
+    public static final String FIELD_NAME_LANGUAGE = "Language";
+
+    public enum ChannelJobType {
+        REPETITIVE
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
new file mode 100644
index 0000000..da0c43b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+import java.util.List;
+
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants.ChannelJobType;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class ChannelJobInfo extends ActiveJob {
+
+    private static final long serialVersionUID = 1L;
+    private List<String> locations;
+
+    public ChannelJobInfo(EntityId entityId, JobId jobId, ActivityState state, JobSpecification spec) {
+        super(entityId, jobId, state, ChannelJobType.REPETITIVE, spec);
+    }
+
+    public List<String> getLocations() {
+        return locations;
+
+    }
+
+    public void setLocations(List<String> locations) {
+        this.locations = locations;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
new file mode 100644
index 0000000..d1df438
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.AUUID;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+/**
+ * Provides functionality for running channel jobs and communicating with Brokers
+ */
+public class ChannelJobService {
+
+    private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());
+
+    public static ScheduledExecutorService startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId,
+            IHyracksClientConnection hcc, long duration)
+            throws Exception {
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    executeJob(jobSpec, jobFlags, jobId, hcc);
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Channel Job Failed to run.", e);
+                }
+            }
+        }, duration, duration, TimeUnit.MILLISECONDS);
+        return scheduledExecutorService;
+    }
+
+    public static void executeJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId,
+            IHyracksClientConnection hcc)
+            throws Exception {
+        LOGGER.info("Executing Channel Job");
+        if (jobId == null) {
+            hcc.startJob(jobSpec, jobFlags);
+        } else {
+            hcc.startJob(jobSpec, jobFlags, jobId);
+        }
+    }
+
+    public static void runChannelJob(JobSpecification channeljobSpec, IHyracksClientConnection hcc) throws Exception {
+        JobId jobId = hcc.startJob(channeljobSpec);
+        hcc.waitForCompletion(jobId);
+    }
+
+    public static void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint,
+            AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException {
+        String formattedString;
+            formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
+        sendMessage(brokerEndpoint, formattedString);
+    }
+
+    public static String formatJSON(EntityId activeJobId, AOrderedList subscriptionIds, String channelExecutionTime) {
+        String JSON = "{ \"dataverseName\":\"" + activeJobId.getDataverse() + "\", \"channelName\":\""
+                + activeJobId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+                + channelExecutionTime + "\", \"subscriptionIds\":[";
+        for (int i = 0; i < subscriptionIds.size(); i++) {
+            AUUID subId = (AUUID) subscriptionIds.getItem(i);
+            String subString = subId.toSimpleString();
+            JSON += "\"" + subString + "\"";
+            if (i < subscriptionIds.size() - 1) {
+                JSON += ",";
+            }
+        }
+        JSON += "]}";
+        return JSON;
+
+    }
+
+    public static long findPeriod(String duration) {
+        //TODO: Allow Repetitive Channels to use YMD durations
+        String hoursMinutesSeconds = "";
+        if (duration.indexOf('T') != -1) {
+            hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
+        }
+        double seconds = 0;
+        if (hoursMinutesSeconds != "") {
+            int pos = 0;
+            if (hoursMinutesSeconds.indexOf('H') != -1) {
+                Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
+                seconds += (hours * 60 * 60);
+                pos = hoursMinutesSeconds.indexOf('H') + 1;
+            }
+            if (hoursMinutesSeconds.indexOf('M') != -1) {
+                Double minutes =
+                        Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
+                seconds += (minutes * 60);
+                pos = hoursMinutesSeconds.indexOf('M') + 1;
+            }
+            if (hoursMinutesSeconds.indexOf('S') != -1) {
+                Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
+                seconds += (s);
+            }
+        }
+        return (long) (seconds * 1000);
+    }
+
+    public static void sendMessage(String targetURL, String urlParameters) {
+        HttpURLConnection connection = null;
+        try {
+            //Create connection
+            URL url = new URL(targetURL);
+            connection = (HttpURLConnection) url.openConnection();
+            connection.setRequestMethod("POST");
+            connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
+
+            connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
+            connection.setRequestProperty("Content-Language", "en-US");
+
+            connection.setUseCaches(false);
+            connection.setDoOutput(true);
+
+            if (connection.getOutputStream() != null) {
+                //Send message
+                DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
+                wr.writeBytes(urlParameters);
+                wr.close();
+            } else {
+                LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
+            }
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                int responseCode = connection.getResponseCode();
+                LOGGER.info("\nSending 'POST' request to URL : " + url);
+                LOGGER.info("Post parameters : " + urlParameters);
+                LOGGER.info("Response Code : " + responseCode);
+            }
+
+            if (connection.getInputStream() != null) {
+                BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
+                String inputLine;
+                StringBuffer response = new StringBuffer();
+                while ((inputLine = in.readLine()) != null) {
+                    response.append(inputLine);
+                }
+                in.close();
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.log(Level.INFO, response.toString());
+                }
+            } else {
+                LOGGER.log(Level.WARNING, "Channel Failed to get response from Broker.");
+            }
+
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
+        } finally {
+            if (connection != null) {
+                connection.disconnect();
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ChannelJobService";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
new file mode 100644
index 0000000..0a6ced2
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.compiler.provider.IRuleSetFactory;
+import org.apache.asterix.lang.aql.rewrites.AQLRewriterFactory;
+import org.apache.asterix.lang.aql.visitor.AQLAstPrintVisitorFactory;
+import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
+import org.apache.asterix.lang.common.base.IParserFactory;
+import org.apache.asterix.lang.common.base.IRewriterFactory;
+import org.apache.asterix.translator.AqlExpressionToPlanTranslatorFactory;
+
+public class BADCompilationProvider implements ILangCompilationProvider {
+
+    @Override
+    public IParserFactory getParserFactory() {
+        return new BADParserFactory();
+    }
+
+    @Override
+    public IRewriterFactory getRewriterFactory() {
+        return new AQLRewriterFactory();
+    }
+
+    @Override
+    public IAstPrintVisitorFactory getAstPrintVisitorFactory() {
+        return new AQLAstPrintVisitorFactory();
+    }
+
+    @Override
+    public ILangExpressionToPlanTranslatorFactory getExpressionToPlanTranslatorFactory() {
+        return new AqlExpressionToPlanTranslatorFactory();
+    }
+
+    @Override
+    public IRuleSetFactory getRuleSetFactory() {
+        return new BADRuleSetFactory();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
new file mode 100644
index 0000000..959600f
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.bad.metadata.BrokerSearchKey;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.ChannelSearchKey;
+import org.apache.asterix.bad.metadata.DataverseBrokersSearchKey;
+import org.apache.asterix.bad.metadata.DataverseChannelsSearchKey;
+import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.bad.metadata.ProcedureSearchKey;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+public class BADLangExtension implements ILangExtension {
+
+    public static final ExtensionId EXTENSION_ID = new ExtensionId(BADLangExtension.class.getSimpleName(), 0);
+
+    @Override
+    public ExtensionId getId() {
+        return EXTENSION_ID;
+    }
+
+    @Override
+    public void configure(List<Pair<String, String>> args) {
+    }
+
+    @Override
+    public ILangCompilationProvider getLangCompilationProvider(Language lang) {
+        switch (lang) {
+            case AQL:
+                return new BADCompilationProvider();
+            case SQLPP:
+                return new SqlppCompilationProvider();
+            default:
+                return null;
+        }
+    }
+
+    @Override
+    public ExtensionKind getExtensionKind() {
+        return ExtensionKind.LANG;
+    }
+
+
+    public static Broker getBroker(MetadataTransactionContext mdTxnCtx, String dataverseName, String brokerName)
+            throws AlgebricksException {
+        BrokerSearchKey brokerSearchKey = new BrokerSearchKey(dataverseName, brokerName);
+        List<Broker> brokers = MetadataManager.INSTANCE.getEntities(mdTxnCtx, brokerSearchKey);
+        if (brokers.isEmpty()) {
+            return null;
+        } else if (brokers.size() > 1) {
+            throw new AlgebricksException("Broker search key returned more than one broker");
+        } else {
+            return brokers.get(0);
+        }
+    }
+
+    public static Channel getChannel(MetadataTransactionContext mdTxnCtx, String dataverseName, String channelName)
+            throws AlgebricksException {
+        ChannelSearchKey channelSearchKey = new ChannelSearchKey(dataverseName, channelName);
+        List<Channel> channels = MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey);
+        if (channels.isEmpty()) {
+            return null;
+        } else if (channels.size() > 1) {
+            throw new AlgebricksException("Channel search key returned more than one channel");
+        } else {
+            return channels.get(0);
+        }
+    }
+
+    public static Procedure getProcedure(MetadataTransactionContext mdTxnCtx, String dataverseName,
+            String procedureName, String arity) throws AlgebricksException {
+        ProcedureSearchKey procedureSearchKey = new ProcedureSearchKey(dataverseName, procedureName, arity);
+        List<Procedure> procedures = MetadataManager.INSTANCE.getEntities(mdTxnCtx, procedureSearchKey);
+        if (procedures.isEmpty()) {
+            return null;
+        } else if (procedures.size() > 1) {
+            throw new AlgebricksException("Procedure search key returned more than one channel");
+        } else {
+            return procedures.get(0);
+        }
+    }
+
+    public static List<Broker> getBrokers(MetadataTransactionContext mdTxnCtx, String dataverseName)
+            throws AlgebricksException {
+        DataverseBrokersSearchKey brokerSearchKey = new DataverseBrokersSearchKey(dataverseName);
+        return MetadataManager.INSTANCE.getEntities(mdTxnCtx, brokerSearchKey);
+    }
+
+    public static List<Channel> getChannels(MetadataTransactionContext mdTxnCtx, String dataverseName)
+            throws AlgebricksException {
+        DataverseChannelsSearchKey channelSearchKey = new DataverseChannelsSearchKey(dataverseName);
+        return MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java
new file mode 100644
index 0000000..58bca17
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.io.Reader;
+
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
+
+public class BADParserFactory implements IParserFactory {
+
+    @Override
+    public IParser createParser(String query) {
+        return new BADAQLParser(query);
+    }
+
+    @Override
+    public IParser createParser(Reader reader) {
+        return new BADAQLParser(reader);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
new file mode 100644
index 0000000..20519dd
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.app.cc.IStatementExecutorExtension;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+public class BADQueryTranslatorExtension implements IStatementExecutorExtension {
+
+    public static final ExtensionId BAD_QUERY_TRANSLATOR_EXTENSION_ID = new ExtensionId(
+            BADQueryTranslatorExtension.class.getSimpleName(), 0);
+
+    private static class LazyHolder {
+        private static final IStatementExecutorFactory INSTANCE = new BADQueryTranslatorFactory();
+
+    }
+
+    @Override
+    public ExtensionId getId() {
+        return BAD_QUERY_TRANSLATOR_EXTENSION_ID;
+    }
+
+    @Override
+    public void configure(List<Pair<String, String>> args) {
+    }
+
+    @Override
+    public IStatementExecutorFactory getQueryTranslatorFactory() {
+        return LazyHolder.INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
new file mode 100644
index 0000000..958b14f
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.translator.SessionConfig;
+
+public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory {
+
+    @Override
+    public QueryTranslator create(List<Statement> statements, SessionConfig conf,
+            ILangCompilationProvider compilationProvider) {
+        return new BADStatementExecutor(statements, conf, compilationProvider);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
new file mode 100644
index 0000000..31d8cd0
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule;
+import org.apache.asterix.compiler.provider.DefaultRuleSetFactory;
+import org.apache.asterix.compiler.provider.IRuleSetFactory;
+import org.apache.asterix.optimizer.base.RuleCollections;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
+import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class BADRuleSetFactory implements IRuleSetFactory {
+
+    @Override
+    public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites()
+            throws AlgebricksException {
+        List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet = DefaultRuleSetFactory.buildLogical();
+        if (logicalRuleSet.size() != 14) {
+            throw new AlgebricksException("Incorrect RuleSet");
+        }
+        List<IAlgebraicRewriteRule> normalizationCollection = RuleCollections.buildNormalizationRuleCollection();
+
+        for (int i = 0; i < normalizationCollection.size(); i++) {
+            IAlgebraicRewriteRule rule = normalizationCollection.get(i);
+            if (rule instanceof UnnestToDataScanRule) {
+                normalizationCollection.add(i + 1, new InsertBrokerNotifierForChannelRule());
+                break;
+            }
+        }
+        SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
+        logicalRuleSet.set(3, new Pair<>(seqOnceCtrl, normalizationCollection));
+        logicalRuleSet.set(7, new Pair<>(seqOnceCtrl, normalizationCollection));
+        return logicalRuleSet;
+    }
+
+    @Override
+    public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites() {
+        return DefaultRuleSetFactory.buildPhysical();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
new file mode 100644
index 0000000..fa18867
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang;
+
+import java.util.List;
+
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.DataverseDropStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+
+public class BADStatementExecutor extends QueryTranslator {
+
+    public BADStatementExecutor(List<Statement> aqlStatements, SessionConfig conf,
+            ILangCompilationProvider compliationProvider) {
+        super(aqlStatements, conf, compliationProvider);
+    }
+
+
+    @Override
+    protected void handleDataverseDropStatement(MetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        //TODO: Remove this when metadata dependencies are in place
+        //TODO: Stop dataset drop when dataset used by channel
+        super.handleDataverseDropStatement(metadataProvider, stmt, hcc);
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
+        List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
+        for (Broker broker : brokers) {
+            BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false);
+            drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+        }
+        List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue());
+        for (Channel channel : channels) {
+            ChannelDropStatement drop = new ChannelDropStatement(dvId,
+                    new Identifier(channel.getChannelId().getEntityName()), false);
+            drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+        }
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
new file mode 100644
index 0000000..7894c44
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class BrokerDropStatement implements IExtensionStatement {
+
+    private final Identifier dataverseName;
+    private final Identifier brokerName;
+    private boolean ifExists;
+
+    public BrokerDropStatement(Identifier dataverseName, Identifier brokerName, boolean ifExists) {
+        this.brokerName = brokerName;
+        this.dataverseName = dataverseName;
+        this.ifExists = ifExists;
+    }
+
+    public boolean getIfExists() {
+        return ifExists;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getBrokerName() {
+        return brokerName;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+        //TODO: dont drop a broker that's being used
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            Broker broker = BADLangExtension.getBroker(mdTxnCtx, dataverse, brokerName.getValue());
+            if (broker == null) {
+                throw new AlgebricksException("A broker with this name " + brokerName + " doesn't exist.");
+            }
+            MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, broker);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            QueryTranslator.abort(e, e, mdTxnCtx);
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
new file mode 100644
index 0000000..6811ef2
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobInfo;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.ChannelEventsListener;
+import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorNodePushable;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
+import org.apache.asterix.lang.common.statement.DropDatasetStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ChannelDropStatement implements IExtensionStatement {
+
+    private final Identifier dataverseName;
+    private final Identifier channelName;
+    private boolean ifExists;
+
+    public ChannelDropStatement(Identifier dataverseName, Identifier channelName, boolean ifExists) {
+        this.dataverseName = dataverseName;
+        this.channelName = channelName;
+        this.ifExists = ifExists;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getChannelName() {
+        return channelName;
+    }
+
+    public boolean getIfExists() {
+        return ifExists;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+        boolean txnActive = false;
+        EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
+        ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE
+                .getActiveEntityListener(entityId);
+        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
+        boolean subscriberRegistered = false;
+        Channel channel = null;
+
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            txnActive = true;
+            channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+            txnActive = false;
+            if (channel == null) {
+                if (ifExists) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("There is no channel with this name " + channelName + ".");
+                }
+            }
+            if (listener != null) {
+                subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
+            }
+            if (!subscriberRegistered) {
+                throw new AsterixException("Channel " + channelName + " is not running");
+            }
+
+            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
+                    .getMessageBroker();
+
+            ChannelJobInfo cInfo = listener.getJobInfo(channel.getChannelId());;
+            Set<String> ncs = new HashSet<>(cInfo.getLocations());
+            AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(
+                    ncs.toArray(new String[ncs.size()]));
+            int partition = 0;
+            for (String location : locations.getLocations()) {
+                messageBroker.sendApplicationMessageToNC(
+                        new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, "cc",
+                                new ActiveRuntimeId(channel.getChannelId(),
+                                        RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition++)),
+                        location);
+            }
+            eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_ENDED);
+
+            //Drop the Channel Datasets
+            //TODO: Need to find some way to handle if this fails.
+            //TODO: Prevent datasets for Channels from being dropped elsewhere
+            DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse),
+                    new Identifier(channel.getResultsDatasetName()), true);
+            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+
+            dropStmt = new DropDatasetStatement(new Identifier(dataverse),
+                    new Identifier(channel.getSubscriptionsDataset()), true);
+            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+
+            if (subscriberRegistered) {
+                listener.deregisterEventSubscriber(eventSubscriber);
+            }
+
+            //Remove the Channel Metadata
+            MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (txnActive) {
+                QueryTranslator.abort(e, e, mdTxnCtx);
+            }
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
new file mode 100644
index 0000000..dc10742
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.aql.expression.FLWOGRExpression;
+import org.apache.asterix.lang.common.base.Clause;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.clause.LetClause;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.FieldAccessor;
+import org.apache.asterix.lang.common.expression.FieldBinding;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.UpsertStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ChannelSubscribeStatement implements IExtensionStatement {
+
+    private final Identifier dataverseName;
+    private final Identifier channelName;
+    private final Identifier brokerDataverseName;
+    private final Identifier brokerName;
+    private final List<Expression> argList;
+    private final String subscriptionId;
+    private final int varCounter;
+
+    public ChannelSubscribeStatement(Identifier dataverseName, Identifier channelName, List<Expression> argList,
+            int varCounter, Identifier brokerDataverseName, Identifier brokerName, String subscriptionId) {
+        this.channelName = channelName;
+        this.dataverseName = dataverseName;
+        this.brokerDataverseName = brokerDataverseName;
+        this.brokerName = brokerName;
+        this.argList = argList;
+        this.subscriptionId = subscriptionId;
+        this.varCounter = varCounter;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getBrokerDataverseName() {
+        return brokerDataverseName;
+    }
+
+    public Identifier getChannelName() {
+        return channelName;
+    }
+
+    public Identifier getBrokerName() {
+        return brokerName;
+    }
+
+    public List<Expression> getArgList() {
+        return argList;
+    }
+
+    public int getVarCounter() {
+        return varCounter;
+    }
+
+    public String getSubscriptionId() {
+        return subscriptionId;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.QUERY;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+        String brokerDataverse = ((QueryTranslator) statementExecutor)
+.getActiveDataverse(brokerDataverseName);
+
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+
+            Channel channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+            if (channel == null) {
+                throw new AsterixException("There is no channel with this name " + channelName + ".");
+            }
+            Broker broker = BADLangExtension.getBroker(mdTxnCtx, brokerDataverse, brokerName.getValue());
+            if (broker == null) {
+                throw new AsterixException("There is no broker with this name " + brokerName + ".");
+            }
+
+            String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+
+            if (argList.size() != channel.getFunction().getArity()) {
+                throw new AsterixException("Channel expected " + channel.getFunction().getArity()
+                        + " parameters but got " + argList.size());
+            }
+
+            Query subscriptionTuple = new Query(false);
+
+            List<FieldBinding> fb = new ArrayList<FieldBinding>();
+            LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.DataverseName));
+            Expression rightExpr = new LiteralExpr(new StringLiteral(brokerDataverse));
+            fb.add(new FieldBinding(leftExpr, rightExpr));
+
+            leftExpr = new LiteralExpr(new StringLiteral(BADConstants.BrokerName));
+            rightExpr = new LiteralExpr(new StringLiteral(broker.getBrokerName()));
+            fb.add(new FieldBinding(leftExpr, rightExpr));
+
+            if (subscriptionId != null) {
+                leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));
+
+                List<Expression> UUIDList = new ArrayList<Expression>();
+                UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
+                FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR;
+                FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
+                        function.getArity());
+                CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
+
+                rightExpr = UUIDCall;
+                fb.add(new FieldBinding(leftExpr, rightExpr));
+            }
+
+            for (int i = 0; i < argList.size(); i++) {
+                leftExpr = new LiteralExpr(new StringLiteral("param" + i));
+                rightExpr = argList.get(i);
+                fb.add(new FieldBinding(leftExpr, rightExpr));
+            }
+            RecordConstructor recordCon = new RecordConstructor(fb);
+            subscriptionTuple.setBody(recordCon);
+
+            subscriptionTuple.setVarCounter(varCounter);
+
+            if (subscriptionId == null) {
+
+                VariableExpr subscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1));
+                VariableExpr useSubscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1));
+                VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
+                VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result", 0));
+                useResultVar.setIsNewVar(false);
+                useSubscriptionVar.setIsNewVar(false);
+                Query returnQuery = new Query(false);
+                List<Clause> clauseList = new ArrayList<>();
+                LetClause let = new LetClause(subscriptionVar,
+                        new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId)));
+                clauseList.add(let);
+                FLWOGRExpression body = new FLWOGRExpression(clauseList, useSubscriptionVar);
+                returnQuery.setBody(body);
+
+                metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+                metadataProvider.setResultAsyncMode(
+                        resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+                InsertStatement insert = new InsertStatement(new Identifier(dataverse),
+                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar,
+                        returnQuery);
+                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, insert, hcc, hdc,
+                        resultDelivery, stats, false);
+            } else {
+                UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
+                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
+                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, upsert, hcc, hdc,
+                        resultDelivery, stats, false);
+            }
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            QueryTranslator.abort(e, e, mdTxnCtx);
+            throw new HyracksDataException(e);
+        }
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
new file mode 100644
index 0000000..17a54ec
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.aql.visitor.AqlDeleteRewriteVisitor;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.FieldAccessor;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.OperatorExpr;
+import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.DeleteStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ChannelUnsubscribeStatement implements IExtensionStatement {
+
+    private final Identifier dataverseName;
+    private final Identifier channelName;
+    private final String subscriptionId;
+    private final int varCounter;
+    private VariableExpr vars;
+    private List<String> dataverses;
+    private List<String> datasets;
+
+    public ChannelUnsubscribeStatement(VariableExpr vars, Identifier dataverseName, Identifier channelName,
+            String subscriptionId, int varCounter, List<String> dataverses, List<String> datasets) {
+        this.vars = vars;
+        this.channelName = channelName;
+        this.dataverseName = dataverseName;
+        this.subscriptionId = subscriptionId;
+        this.varCounter = varCounter;
+        this.dataverses = dataverses;
+        this.datasets = datasets;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public VariableExpr getVariableExpr() {
+        return vars;
+    }
+
+    public Identifier getChannelName() {
+        return channelName;
+    }
+
+    public String getsubScriptionId() {
+        return subscriptionId;
+    }
+
+    public List<String> getDataverses() {
+        return dataverses;
+    }
+
+    public List<String> getDatasets() {
+        return datasets;
+    }
+
+    public int getVarCounter() {
+        return varCounter;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.UPDATE;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
+
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+
+            Channel channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
+            if (channel == null) {
+                throw new AsterixException("There is no channel with this name " + channelName + ".");
+            }
+
+            String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+
+            //Need a condition to say subscription-id = sid
+            OperatorExpr condition = new OperatorExpr();
+            FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.SubscriptionId));
+            condition.addOperand(fa);
+            condition.setCurrentop(true);
+            condition.addOperator("=");
+
+            List<Expression> UUIDList = new ArrayList<Expression>();
+            UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
+
+            FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR;
+            FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
+                    function.getArity());
+            CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
+
+            condition.addOperand(UUIDCall);
+
+            DeleteStatement delete = new DeleteStatement(vars, new Identifier(dataverse),
+                    new Identifier(subscriptionsDatasetName), condition, varCounter, dataverses, datasets);
+            AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
+            delete.accept(visitor, null);
+
+            ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            QueryTranslator.abort(e, e, mdTxnCtx);
+            throw new HyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message