Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 93714200BD8 for ; Wed, 7 Dec 2016 21:59:43 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 91DD6160AF9; Wed, 7 Dec 2016 20:59:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3D25F160B30 for ; Wed, 7 Dec 2016 21:59:41 +0100 (CET) Received: (qmail 2268 invoked by uid 500); 7 Dec 2016 20:59:40 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 2124 invoked by uid 99); 7 Dec 2016 20:59:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Dec 2016 20:59:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 09D43F2151; Wed, 7 Dec 2016 20:59:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjaco002@apache.org To: commits@asterixdb.apache.org Date: Wed, 07 Dec 2016 20:59:45 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [7/7] asterixdb-bad git commit: Updated to match code changes to asterix archived-at: Wed, 07 Dec 2016 20:59:43 -0000 Updated to match code changes to asterix Added Procedure Langauge and Metadata Restructured to fit with bom pom Added ChannelJobService for execution tasks Added string constants file Added BAD Rewrite Rule Set Change-Id: I010b81776543e127f09f046a8601bb7184f7de9a Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/d0ec8377 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/d0ec8377 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/d0ec8377 Branch: refs/heads/master Commit: d0ec837761d1e679ee0d8ac57adec12c0d9d4e8b Parents: f73ca84 Author: Steven Glenn Jacobs Authored: Wed Dec 7 11:00:08 2016 -0800 Committer: Steven Glenn Jacobs Committed: Wed Dec 7 11:00:08 2016 -0800 ---------------------------------------------------------------------- .gitignore | 1 + asterix-bad/pom.xml | 218 ++++++++++ .../org/apache/asterix/bad/BADConstants.java | 56 +++ .../org/apache/asterix/bad/ChannelJobInfo.java | 48 +++ .../apache/asterix/bad/ChannelJobService.java | 194 +++++++++ .../bad/lang/BADCompilationProvider.java | 58 +++ .../asterix/bad/lang/BADLangExtension.java | 122 ++++++ .../asterix/bad/lang/BADParserFactory.java | 38 ++ .../bad/lang/BADQueryTranslatorExtension.java | 51 +++ .../bad/lang/BADQueryTranslatorFactory.java | 36 ++ .../asterix/bad/lang/BADRuleSetFactory.java | 63 +++ .../asterix/bad/lang/BADStatementExecutor.java | 69 ++++ .../bad/lang/statement/BrokerDropStatement.java | 100 +++++ .../lang/statement/ChannelDropStatement.java | 175 +++++++++ .../statement/ChannelSubscribeStatement.java | 230 +++++++++++ .../statement/ChannelUnsubscribeStatement.java | 165 ++++++++ .../lang/statement/CreateBrokerStatement.java | 106 +++++ .../lang/statement/CreateChannelStatement.java | 393 +++++++++++++++++++ .../statement/CreateProcedureStatement.java | 185 +++++++++ .../bad/metadata/BADMetadataExtension.java | 121 ++++++ .../bad/metadata/BADMetadataIndexes.java | 82 ++++ .../bad/metadata/BADMetadataRecordTypes.java | 100 +++++ .../org/apache/asterix/bad/metadata/Broker.java | 69 ++++ .../asterix/bad/metadata/BrokerSearchKey.java | 45 +++ .../bad/metadata/BrokerTupleTranslator.java | 119 ++++++ .../apache/asterix/bad/metadata/Channel.java | 86 ++++ .../bad/metadata/ChannelEventsListener.java | 229 +++++++++++ .../asterix/bad/metadata/ChannelSearchKey.java | 45 +++ .../bad/metadata/ChannelTupleTranslator.java | 159 ++++++++ .../bad/metadata/DataverseBrokersSearchKey.java | 43 ++ .../metadata/DataverseChannelsSearchKey.java | 43 ++ .../apache/asterix/bad/metadata/Procedure.java | 96 +++++ .../bad/metadata/ProcedureSearchKey.java | 47 +++ .../bad/metadata/ProcedureTupleTranslator.java | 189 +++++++++ .../InsertBrokerNotifierForChannelRule.java | 323 +++++++++++++++ .../bad/runtime/NotifyBrokerOperator.java | 90 +++++ .../bad/runtime/NotifyBrokerPOperator.java | 111 ++++++ .../bad/runtime/NotifyBrokerRuntime.java | 139 +++++++ .../bad/runtime/NotifyBrokerRuntimeFactory.java | 55 +++ .../RepetitiveChannelOperatorDescriptor.java | 78 ++++ .../RepetitiveChannelOperatorNodePushable.java | 76 ++++ .../src/main/resources/lang-extension/lang.txt | 206 ++++++++++ .../asterix/bad/test/BADExecutionTest.java | 98 +++++ .../asterix/bad/test/BADOptimizerTest.java | 60 +++ .../conf/asterix-build-configuration.xml | 110 ++++++ asterix-bad/src/test/resources/conf/cluster.xml | 49 +++ .../conf/hyracks-deployment.properties | 21 + .../src/test/resources/conf/test.properties | 22 ++ .../queries/channel/channel-create.aql | 36 ++ .../queries/channel/channel-subscribe.aql | 40 ++ .../queries/channel/channel-unsubscribe.aql | 38 ++ .../results/channel/channel-create.plan | 57 +++ .../results/channel/channel-subscribe.plan | 71 ++++ .../results/channel/channel-unsubscribe.plan | 71 ++++ .../create_channel_check_datasets.1.ddl.aql | 34 ++ .../create_channel_check_datasets.3.query.aql | 7 + .../create_channel_check_metadata.1.ddl.aql | 34 ++ .../create_channel_check_metadata.3.query.aql | 3 + .../drop_channel_check_datasets.1.ddl.aql | 38 ++ .../drop_channel_check_datasets.2.ddl.aql | 3 + .../drop_channel_check_datasets.3.query.aql | 7 + .../drop_channel_check_metadata.1.ddl.aql | 38 ++ .../drop_channel_check_metadata.2.ddl.aql | 3 + .../drop_channel_check_metadata.3.query.aql | 5 + .../room_occupants/room_occupants.1.ddl.aql | 56 +++ .../room_occupants/room_occupants.2.update.aql | 12 + .../room_occupants/room_occupants.3.query.aql | 15 + .../room_occupants/room_occupants.4.update.aql | 16 + .../room_occupants/room_occupants.5.sleep.aql | 7 + .../room_occupants/room_occupants.6.update.aql | 14 + .../room_occupants/room_occupants.7.query.aql | 12 + ...scribe_channel_check_subscriptions.1.ddl.aql | 36 ++ ...ibe_channel_check_subscriptions.2.update.aql | 3 + ...ibe_channel_check_subscriptions.3.update.aql | 3 + ...ibe_channel_check_subscriptions.4.update.aql | 3 + ...ribe_channel_check_subscriptions.5.query.aql | 5 + .../create_channel_check_datasets.1.adm | 2 + .../create_channel_check_metadata.1.adm | 1 + .../drop_channel_check_datasets.1.adm | 4 + .../drop_channel_check_metadata.1.adm | 2 + .../channel/room_occupants/room_occupants.3.adm | 2 + .../channel/room_occupants/room_occupants.7.adm | 2 + .../subscribe_channel_check_subscriptions.1.adm | 3 + .../src/test/resources/runtimets/testsuite.xml | 37 ++ asterix-opt-bom/pom.xml | 47 +++ pom.xml | 207 +--------- .../org/apache/asterix/bad/BADConstants.java | 48 --- .../org/apache/asterix/bad/ChannelJobInfo.java | 48 --- .../apache/asterix/bad/ChannelJobService.java | 145 ------- .../bad/lang/BADCompilationProvider.java | 52 --- .../asterix/bad/lang/BADLangExtension.java | 106 ----- .../asterix/bad/lang/BADParserFactory.java | 38 -- .../bad/lang/BADQueryTranslatorExtension.java | 53 --- .../bad/lang/BADQueryTranslatorFactory.java | 41 -- .../asterix/bad/lang/BADStatementExecutor.java | 45 --- .../bad/lang/statement/BrokerDropStatement.java | 100 ----- .../lang/statement/ChannelDropStatement.java | 175 --------- .../statement/ChannelSubscribeStatement.java | 209 ---------- .../statement/ChannelUnsubscribeStatement.java | 165 -------- .../lang/statement/CreateBrokerStatement.java | 106 ----- .../lang/statement/CreateChannelStatement.java | 371 ----------------- .../bad/metadata/BADMetadataExtension.java | 115 ------ .../bad/metadata/BADMetadataIndexes.java | 66 ---- .../bad/metadata/BADMetadataRecordTypes.java | 77 ---- .../org/apache/asterix/bad/metadata/Broker.java | 69 ---- .../asterix/bad/metadata/BrokerSearchKey.java | 45 --- .../bad/metadata/BrokerTupleTranslator.java | 118 ------ .../apache/asterix/bad/metadata/Channel.java | 86 ---- .../bad/metadata/ChannelEventsListener.java | 231 ----------- .../asterix/bad/metadata/ChannelSearchKey.java | 45 --- .../bad/metadata/ChannelTupleTranslator.java | 159 -------- .../InsertBrokerNotifierForChannelRule.java | 317 --------------- .../bad/runtime/NotifyBrokerOperator.java | 90 ----- .../bad/runtime/NotifyBrokerPOperator.java | 111 ------ .../bad/runtime/NotifyBrokerRuntime.java | 138 ------- .../bad/runtime/NotifyBrokerRuntimeFactory.java | 55 --- .../RepetitiveChannelOperatorDescriptor.java | 83 ---- .../RepetitiveChannelOperatorNodePushable.java | 125 ------ src/main/resources/lang-extension/lang.txt | 178 --------- .../asterix/bad/test/BADExecutionTest.java | 98 ----- .../asterix/bad/test/BADOptimizerTest.java | 55 --- .../conf/asterix-build-configuration.xml | 110 ------ src/test/resources/conf/cluster.xml | 49 --- .../conf/hyracks-deployment.properties | 21 - src/test/resources/conf/test.properties | 22 -- .../queries/channel/channel-create.aql | 36 -- .../queries/channel/channel-subscribe.aql | 40 -- .../queries/channel/channel-unsubscribe.aql | 38 -- .../results/channel/channel-create.plan | 30 -- .../results/channel/channel-subscribe.plan | 44 --- .../results/channel/channel-unsubscribe.plan | 44 --- .../create_channel_check_datasets.1.ddl.aql | 34 -- .../create_channel_check_datasets.3.query.aql | 7 - .../create_channel_check_metadata.1.ddl.aql | 34 -- .../create_channel_check_metadata.3.query.aql | 3 - .../drop_channel_check_datasets.1.ddl.aql | 38 -- .../drop_channel_check_datasets.2.ddl.aql | 3 - .../drop_channel_check_datasets.3.query.aql | 7 - .../drop_channel_check_metadata.1.ddl.aql | 38 -- .../drop_channel_check_metadata.2.ddl.aql | 3 - .../drop_channel_check_metadata.3.query.aql | 3 - .../room_occupants/room_occupants.1.ddl.aql | 56 --- .../room_occupants/room_occupants.2.update.aql | 12 - .../room_occupants/room_occupants.3.query.aql | 15 - .../room_occupants/room_occupants.4.update.aql | 16 - .../room_occupants/room_occupants.5.sleep.aql | 7 - .../room_occupants/room_occupants.6.update.aql | 14 - .../room_occupants/room_occupants.7.query.aql | 12 - ...scribe_channel_check_subscriptions.1.ddl.aql | 34 -- ...ibe_channel_check_subscriptions.2.update.aql | 7 - ...ribe_channel_check_subscriptions.3.query.aql | 5 - .../create_channel_check_datasets.1.adm | 2 - .../create_channel_check_metadata.1.adm | 1 - .../drop_channel_check_datasets.1.adm | 4 - .../drop_channel_check_metadata.1.adm | 2 - .../channel/room_occupants/room_occupants.3.adm | 2 - .../channel/room_occupants/room_occupants.7.adm | 2 - .../subscribe_channel_check_subscriptions.1.adm | 3 - src/test/resources/runtimets/testsuite.xml | 37 -- 159 files changed, 6001 insertions(+), 4990 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index a4ee8e4..42fc47d 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ target git.properties .DS_Store *.swp +build http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/pom.xml ---------------------------------------------------------------------- diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml new file mode 100644 index 0000000..1a26d5a --- /dev/null +++ b/asterix-bad/pom.xml @@ -0,0 +1,218 @@ + + + 4.0.0 + + org.apache.asterix.bad + asterix-opt + 1.0.0-SNAPSHOT + + asterix-bad + + 0.8.9-SNAPSHOT + + + + + org.apache.asterix + asterix-grammar-extension-maven-plugin + ${asterix.version} + + ${project.basedir} + ../../asterix-lang-aql/src/main/javacc/AQL.jj + src/main/resources/lang-extension/lang.txt + target/generated-resources/javacc/grammar.jj + BADAQLParser + org.apache.asterix.bad.lang + + + + generate-sources + + grammarix + + + + + + org.codehaus.mojo + javacc-maven-plugin + 2.6 + + + javacc + + javacc + + + false + true + target/generated-resources/javacc + + + + javacc-jjdoc + + jjdoc + + process-sources + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.9 + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/javacc/ + + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.asterix + asterix-grammar-extension-maven-plugin + [${asterix.version},) + + grammarix + + + + + + + + + org.codehaus.mojo + javacc-maven-plugin + [2.6,) + + javacc + + + + + + + + + + + + + + + + org.apache.asterix + asterix-om + ${asterix.version} + jar + compile + + + org.apache.hyracks + hyracks-test-support + test + + + org.apache.asterix + asterix-runtime + ${asterix.version} + jar + compile + + + org.apache.hyracks + algebricks-compiler + + + org.apache.hyracks + hyracks-hdfs-core + + + org.apache.asterix + asterix-common + ${asterix.version} + + + org.apache.asterix + asterix-test-framework + ${asterix.version} + + + org.apache.asterix + asterix-active + ${asterix.version} + + + org.apache.asterix + asterix-algebra + ${asterix.version} + + + org.apache.asterix + asterix-app + ${asterix.version} + jar + compile + + + org.apache.asterix + asterix-app + ${asterix.version} + test-jar + test + + + org.apache.asterix + asterix-common + ${asterix.version} + test-jar + test + + + org.apache.hadoop + hadoop-minicluster + ${hadoop.version} + jar + test + + + http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java new file mode 100644 index 0000000..a906ae6 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad; + +public interface BADConstants { + public static final String SubscriptionId = "subscriptionId"; + public static final String BrokerName = "BrokerName"; + public static final String ChannelName = "ChannelName"; + public static final String ProcedureName = "ProcedureName"; + public static final String DataverseName = "DataverseName"; + public static final String BrokerEndPoint = "BrokerEndPoint"; + public static final String DeliveryTime = "deliveryTime"; + public static final String ResultId = "resultId"; + public static final String ChannelExecutionTime = "channelExecutionTime"; + public static final String ChannelSubscriptionsType = "ChannelSubscriptionsType"; + public static final String ChannelResultsType = "ChannelResultsType"; + public static final String ResultsDatasetName = "ResultsDatasetName"; + public static final String SubscriptionsDatasetName = "SubscriptionsDatasetName"; + public static final String CHANNEL_EXTENSION_NAME = "Channel"; + public static final String PROCEDURE_KEYWORD = "Procedure"; + public static final String BROKER_KEYWORD = "Broker"; + public static final String RECORD_TYPENAME_BROKER = "BrokerRecordType"; + public static final String RECORD_TYPENAME_CHANNEL = "ChannelRecordType"; + public static final String RECORD_TYPENAME_PROCEDURE = "ProcedureRecordType"; + public static final String subscriptionEnding = "Subscriptions"; + public static final String resultsEnding = "Results"; + public static final String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension"; + public static final String BAD_DATAVERSE_NAME = "Metadata"; + public static final String Duration = "Duration"; + public static final String Function = "Function"; + public static final String FIELD_NAME_ARITY = "Arity"; + public static final String FIELD_NAME_PARAMS = "Params"; + public static final String FIELD_NAME_RETURN_TYPE = "ReturnType"; + public static final String FIELD_NAME_DEFINITION = "Definition"; + public static final String FIELD_NAME_LANGUAGE = "Language"; + + public enum ChannelJobType { + REPETITIVE + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java new file mode 100644 index 0000000..da0c43b --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad; + +import java.util.List; + +import org.apache.asterix.active.ActiveJob; +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.bad.BADConstants.ChannelJobType; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; + +public class ChannelJobInfo extends ActiveJob { + + private static final long serialVersionUID = 1L; + private List locations; + + public ChannelJobInfo(EntityId entityId, JobId jobId, ActivityState state, JobSpecification spec) { + super(entityId, jobId, state, ChannelJobType.REPETITIVE, spec); + } + + public List getLocations() { + return locations; + + } + + public void setLocations(List locations) { + this.locations = locations; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java new file mode 100644 index 0000000..d1df438 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad; + +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.EnumSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.active.EntityId; +import org.apache.asterix.om.base.AOrderedList; +import org.apache.asterix.om.base.AUUID; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; + +/** + * Provides functionality for running channel jobs and communicating with Brokers + */ +public class ChannelJobService { + + private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName()); + + public static ScheduledExecutorService startJob(JobSpecification jobSpec, EnumSet jobFlags, JobId jobId, + IHyracksClientConnection hcc, long duration) + throws Exception { + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + executeJob(jobSpec, jobFlags, jobId, hcc); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Channel Job Failed to run.", e); + } + } + }, duration, duration, TimeUnit.MILLISECONDS); + return scheduledExecutorService; + } + + public static void executeJob(JobSpecification jobSpec, EnumSet jobFlags, JobId jobId, + IHyracksClientConnection hcc) + throws Exception { + LOGGER.info("Executing Channel Job"); + if (jobId == null) { + hcc.startJob(jobSpec, jobFlags); + } else { + hcc.startJob(jobSpec, jobFlags, jobId); + } + } + + public static void runChannelJob(JobSpecification channeljobSpec, IHyracksClientConnection hcc) throws Exception { + JobId jobId = hcc.startJob(channeljobSpec); + hcc.waitForCompletion(jobId); + } + + public static void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint, + AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException { + String formattedString; + formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime); + sendMessage(brokerEndpoint, formattedString); + } + + public static String formatJSON(EntityId activeJobId, AOrderedList subscriptionIds, String channelExecutionTime) { + String JSON = "{ \"dataverseName\":\"" + activeJobId.getDataverse() + "\", \"channelName\":\"" + + activeJobId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\"" + + channelExecutionTime + "\", \"subscriptionIds\":["; + for (int i = 0; i < subscriptionIds.size(); i++) { + AUUID subId = (AUUID) subscriptionIds.getItem(i); + String subString = subId.toSimpleString(); + JSON += "\"" + subString + "\""; + if (i < subscriptionIds.size() - 1) { + JSON += ","; + } + } + JSON += "]}"; + return JSON; + + } + + public static long findPeriod(String duration) { + //TODO: Allow Repetitive Channels to use YMD durations + String hoursMinutesSeconds = ""; + if (duration.indexOf('T') != -1) { + hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1); + } + double seconds = 0; + if (hoursMinutesSeconds != "") { + int pos = 0; + if (hoursMinutesSeconds.indexOf('H') != -1) { + Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H'))); + seconds += (hours * 60 * 60); + pos = hoursMinutesSeconds.indexOf('H') + 1; + } + if (hoursMinutesSeconds.indexOf('M') != -1) { + Double minutes = + Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M'))); + seconds += (minutes * 60); + pos = hoursMinutesSeconds.indexOf('M') + 1; + } + if (hoursMinutesSeconds.indexOf('S') != -1) { + Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S'))); + seconds += (s); + } + } + return (long) (seconds * 1000); + } + + public static void sendMessage(String targetURL, String urlParameters) { + HttpURLConnection connection = null; + try { + //Create connection + URL url = new URL(targetURL); + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); + + connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length)); + connection.setRequestProperty("Content-Language", "en-US"); + + connection.setUseCaches(false); + connection.setDoOutput(true); + + if (connection.getOutputStream() != null) { + //Send message + DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); + wr.writeBytes(urlParameters); + wr.close(); + } else { + LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker."); + } + + if (LOGGER.isLoggable(Level.INFO)) { + int responseCode = connection.getResponseCode(); + LOGGER.info("\nSending 'POST' request to URL : " + url); + LOGGER.info("Post parameters : " + urlParameters); + LOGGER.info("Response Code : " + responseCode); + } + + if (connection.getInputStream() != null) { + BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream())); + String inputLine; + StringBuffer response = new StringBuffer(); + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); + } + in.close(); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, response.toString()); + } + } else { + LOGGER.log(Level.WARNING, "Channel Failed to get response from Broker."); + } + + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker."); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + @Override + public String toString() { + return "ChannelJobService"; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java new file mode 100644 index 0000000..0a6ced2 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.lang; + +import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.compiler.provider.IRuleSetFactory; +import org.apache.asterix.lang.aql.rewrites.AQLRewriterFactory; +import org.apache.asterix.lang.aql.visitor.AQLAstPrintVisitorFactory; +import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory; +import org.apache.asterix.lang.common.base.IParserFactory; +import org.apache.asterix.lang.common.base.IRewriterFactory; +import org.apache.asterix.translator.AqlExpressionToPlanTranslatorFactory; + +public class BADCompilationProvider implements ILangCompilationProvider { + + @Override + public IParserFactory getParserFactory() { + return new BADParserFactory(); + } + + @Override + public IRewriterFactory getRewriterFactory() { + return new AQLRewriterFactory(); + } + + @Override + public IAstPrintVisitorFactory getAstPrintVisitorFactory() { + return new AQLAstPrintVisitorFactory(); + } + + @Override + public ILangExpressionToPlanTranslatorFactory getExpressionToPlanTranslatorFactory() { + return new AqlExpressionToPlanTranslatorFactory(); + } + + @Override + public IRuleSetFactory getRuleSetFactory() { + return new BADRuleSetFactory(); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java new file mode 100644 index 0000000..959600f --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.lang; + +import java.util.List; + +import org.apache.asterix.algebra.base.ILangExtension; +import org.apache.asterix.bad.metadata.Broker; +import org.apache.asterix.bad.metadata.BrokerSearchKey; +import org.apache.asterix.bad.metadata.Channel; +import org.apache.asterix.bad.metadata.ChannelSearchKey; +import org.apache.asterix.bad.metadata.DataverseBrokersSearchKey; +import org.apache.asterix.bad.metadata.DataverseChannelsSearchKey; +import org.apache.asterix.bad.metadata.Procedure; +import org.apache.asterix.bad.metadata.ProcedureSearchKey; +import org.apache.asterix.common.api.ExtensionId; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.compiler.provider.SqlppCompilationProvider; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; + +public class BADLangExtension implements ILangExtension { + + public static final ExtensionId EXTENSION_ID = new ExtensionId(BADLangExtension.class.getSimpleName(), 0); + + @Override + public ExtensionId getId() { + return EXTENSION_ID; + } + + @Override + public void configure(List> args) { + } + + @Override + public ILangCompilationProvider getLangCompilationProvider(Language lang) { + switch (lang) { + case AQL: + return new BADCompilationProvider(); + case SQLPP: + return new SqlppCompilationProvider(); + default: + return null; + } + } + + @Override + public ExtensionKind getExtensionKind() { + return ExtensionKind.LANG; + } + + + public static Broker getBroker(MetadataTransactionContext mdTxnCtx, String dataverseName, String brokerName) + throws AlgebricksException { + BrokerSearchKey brokerSearchKey = new BrokerSearchKey(dataverseName, brokerName); + List brokers = MetadataManager.INSTANCE.getEntities(mdTxnCtx, brokerSearchKey); + if (brokers.isEmpty()) { + return null; + } else if (brokers.size() > 1) { + throw new AlgebricksException("Broker search key returned more than one broker"); + } else { + return brokers.get(0); + } + } + + public static Channel getChannel(MetadataTransactionContext mdTxnCtx, String dataverseName, String channelName) + throws AlgebricksException { + ChannelSearchKey channelSearchKey = new ChannelSearchKey(dataverseName, channelName); + List channels = MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey); + if (channels.isEmpty()) { + return null; + } else if (channels.size() > 1) { + throw new AlgebricksException("Channel search key returned more than one channel"); + } else { + return channels.get(0); + } + } + + public static Procedure getProcedure(MetadataTransactionContext mdTxnCtx, String dataverseName, + String procedureName, String arity) throws AlgebricksException { + ProcedureSearchKey procedureSearchKey = new ProcedureSearchKey(dataverseName, procedureName, arity); + List procedures = MetadataManager.INSTANCE.getEntities(mdTxnCtx, procedureSearchKey); + if (procedures.isEmpty()) { + return null; + } else if (procedures.size() > 1) { + throw new AlgebricksException("Procedure search key returned more than one channel"); + } else { + return procedures.get(0); + } + } + + public static List getBrokers(MetadataTransactionContext mdTxnCtx, String dataverseName) + throws AlgebricksException { + DataverseBrokersSearchKey brokerSearchKey = new DataverseBrokersSearchKey(dataverseName); + return MetadataManager.INSTANCE.getEntities(mdTxnCtx, brokerSearchKey); + } + + public static List getChannels(MetadataTransactionContext mdTxnCtx, String dataverseName) + throws AlgebricksException { + DataverseChannelsSearchKey channelSearchKey = new DataverseChannelsSearchKey(dataverseName); + return MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java new file mode 100644 index 0000000..58bca17 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADParserFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.lang; + +import java.io.Reader; + +import org.apache.asterix.lang.common.base.IParser; +import org.apache.asterix.lang.common.base.IParserFactory; + +public class BADParserFactory implements IParserFactory { + + @Override + public IParser createParser(String query) { + return new BADAQLParser(query); + } + + @Override + public IParser createParser(Reader reader) { + return new BADAQLParser(reader); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java new file mode 100644 index 0000000..20519dd --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorExtension.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.lang; + +import java.util.List; + +import org.apache.asterix.app.cc.IStatementExecutorExtension; +import org.apache.asterix.common.api.ExtensionId; +import org.apache.asterix.translator.IStatementExecutorFactory; +import org.apache.hyracks.algebricks.common.utils.Pair; + +public class BADQueryTranslatorExtension implements IStatementExecutorExtension { + + public static final ExtensionId BAD_QUERY_TRANSLATOR_EXTENSION_ID = new ExtensionId( + BADQueryTranslatorExtension.class.getSimpleName(), 0); + + private static class LazyHolder { + private static final IStatementExecutorFactory INSTANCE = new BADQueryTranslatorFactory(); + + } + + @Override + public ExtensionId getId() { + return BAD_QUERY_TRANSLATOR_EXTENSION_ID; + } + + @Override + public void configure(List> args) { + } + + @Override + public IStatementExecutorFactory getQueryTranslatorFactory() { + return LazyHolder.INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java new file mode 100644 index 0000000..958b14f --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.lang; + +import java.util.List; + +import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; +import org.apache.asterix.app.translator.QueryTranslator; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.translator.SessionConfig; + +public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory { + + @Override + public QueryTranslator create(List statements, SessionConfig conf, + ILangCompilationProvider compilationProvider) { + return new BADStatementExecutor(statements, conf, compilationProvider); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java new file mode 100644 index 0000000..31d8cd0 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.lang; + +import java.util.List; + +import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule; +import org.apache.asterix.compiler.provider.DefaultRuleSetFactory; +import org.apache.asterix.compiler.provider.IRuleSetFactory; +import org.apache.asterix.optimizer.base.RuleCollections; +import org.apache.asterix.optimizer.rules.UnnestToDataScanRule; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController; +import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController; +import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; + +public class BADRuleSetFactory implements IRuleSetFactory { + + @Override + public List>> getLogicalRewrites() + throws AlgebricksException { + List>> logicalRuleSet = DefaultRuleSetFactory.buildLogical(); + if (logicalRuleSet.size() != 14) { + throw new AlgebricksException("Incorrect RuleSet"); + } + List normalizationCollection = RuleCollections.buildNormalizationRuleCollection(); + + for (int i = 0; i < normalizationCollection.size(); i++) { + IAlgebraicRewriteRule rule = normalizationCollection.get(i); + if (rule instanceof UnnestToDataScanRule) { + normalizationCollection.add(i + 1, new InsertBrokerNotifierForChannelRule()); + break; + } + } + SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true); + logicalRuleSet.set(3, new Pair<>(seqOnceCtrl, normalizationCollection)); + logicalRuleSet.set(7, new Pair<>(seqOnceCtrl, normalizationCollection)); + return logicalRuleSet; + } + + @Override + public List>> getPhysicalRewrites() { + return DefaultRuleSetFactory.buildPhysical(); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java new file mode 100644 index 0000000..fa18867 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.lang; + +import java.util.List; + +import org.apache.asterix.app.translator.QueryTranslator; +import org.apache.asterix.bad.lang.statement.BrokerDropStatement; +import org.apache.asterix.bad.lang.statement.ChannelDropStatement; +import org.apache.asterix.bad.metadata.Broker; +import org.apache.asterix.bad.metadata.Channel; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.lang.common.statement.DataverseDropStatement; +import org.apache.asterix.lang.common.struct.Identifier; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.translator.SessionConfig; +import org.apache.hyracks.api.client.IHyracksClientConnection; + +public class BADStatementExecutor extends QueryTranslator { + + public BADStatementExecutor(List aqlStatements, SessionConfig conf, + ILangCompilationProvider compliationProvider) { + super(aqlStatements, conf, compliationProvider); + } + + + @Override + protected void handleDataverseDropStatement(MetadataProvider metadataProvider, Statement stmt, + IHyracksClientConnection hcc) throws Exception { + //TODO: Remove this when metadata dependencies are in place + //TODO: Stop dataset drop when dataset used by channel + super.handleDataverseDropStatement(metadataProvider, stmt, hcc); + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxnCtx); + Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName(); + List brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue()); + for (Broker broker : brokers) { + BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false); + drop.handle(this, metadataProvider, hcc, null, null, null, 0); + } + List channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue()); + for (Channel channel : channels) { + ChannelDropStatement drop = new ChannelDropStatement(dvId, + new Identifier(channel.getChannelId().getEntityName()), false); + drop.handle(this, metadataProvider, hcc, null, null, null, 0); + } + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java new file mode 100644 index 0000000..7894c44 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.lang.statement; + +import org.apache.asterix.algebra.extension.IExtensionStatement; +import org.apache.asterix.app.translator.QueryTranslator; +import org.apache.asterix.bad.lang.BADLangExtension; +import org.apache.asterix.bad.metadata.Broker; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.lang.common.struct.Identifier; +import org.apache.asterix.lang.common.visitor.base.ILangVisitor; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; +import org.apache.asterix.translator.IStatementExecutor.Stats; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.dataset.IHyracksDataset; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class BrokerDropStatement implements IExtensionStatement { + + private final Identifier dataverseName; + private final Identifier brokerName; + private boolean ifExists; + + public BrokerDropStatement(Identifier dataverseName, Identifier brokerName, boolean ifExists) { + this.brokerName = brokerName; + this.dataverseName = dataverseName; + this.ifExists = ifExists; + } + + public boolean getIfExists() { + return ifExists; + } + + public Identifier getDataverseName() { + return dataverseName; + } + + public Identifier getBrokerName() { + return brokerName; + } + + @Override + public byte getKind() { + return Kind.EXTENSION; + } + + @Override + public byte getCategory() { + return Category.DDL; + } + + @Override + public R accept(ILangVisitor visitor, T arg) throws AsterixException { + return null; + } + + @Override + public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, + IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, + int resultSetIdCounter) throws HyracksDataException, AlgebricksException { + //TODO: dont drop a broker that's being used + String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName); + MetadataTransactionContext mdTxnCtx = null; + try { + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxnCtx); + Broker broker = BADLangExtension.getBroker(mdTxnCtx, dataverse, brokerName.getValue()); + if (broker == null) { + throw new AlgebricksException("A broker with this name " + brokerName + " doesn't exist."); + } + MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, broker); + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + } catch (Exception e) { + QueryTranslator.abort(e, e, mdTxnCtx); + throw new HyracksDataException(e); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java new file mode 100644 index 0000000..6811ef2 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.lang.statement; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.asterix.active.ActiveJobNotificationHandler; +import org.apache.asterix.active.ActiveRuntimeId; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.message.ActiveManagerMessage; +import org.apache.asterix.algebra.extension.IExtensionStatement; +import org.apache.asterix.app.translator.QueryTranslator; +import org.apache.asterix.bad.BADConstants; +import org.apache.asterix.bad.ChannelJobInfo; +import org.apache.asterix.bad.lang.BADLangExtension; +import org.apache.asterix.bad.metadata.Channel; +import org.apache.asterix.bad.metadata.ChannelEventsListener; +import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorNodePushable; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber; +import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent; +import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber; +import org.apache.asterix.lang.common.statement.DropDatasetStatement; +import org.apache.asterix.lang.common.struct.Identifier; +import org.apache.asterix.lang.common.visitor.base.ILangVisitor; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.runtime.util.AsterixAppContextInfo; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; +import org.apache.asterix.translator.IStatementExecutor.Stats; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.dataset.IHyracksDataset; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class ChannelDropStatement implements IExtensionStatement { + + private final Identifier dataverseName; + private final Identifier channelName; + private boolean ifExists; + + public ChannelDropStatement(Identifier dataverseName, Identifier channelName, boolean ifExists) { + this.dataverseName = dataverseName; + this.channelName = channelName; + this.ifExists = ifExists; + } + + public Identifier getDataverseName() { + return dataverseName; + } + + public Identifier getChannelName() { + return channelName; + } + + public boolean getIfExists() { + return ifExists; + } + + @Override + public byte getKind() { + return Kind.EXTENSION; + } + + @Override + public byte getCategory() { + return Category.DDL; + } + + @Override + public R accept(ILangVisitor visitor, T arg) throws AsterixException { + return null; + } + + @Override + public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, + IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, + int resultSetIdCounter) throws HyracksDataException, AlgebricksException { + + String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName); + boolean txnActive = false; + EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue()); + ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE + .getActiveEntityListener(entityId); + IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber(); + boolean subscriberRegistered = false; + Channel channel = null; + + MetadataTransactionContext mdTxnCtx = null; + try { + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + txnActive = true; + channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue()); + txnActive = false; + if (channel == null) { + if (ifExists) { + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + return; + } else { + throw new AlgebricksException("There is no channel with this name " + channelName + "."); + } + } + if (listener != null) { + subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber); + } + if (!subscriberRegistered) { + throw new AsterixException("Channel " + channelName + " is not running"); + } + + ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext() + .getMessageBroker(); + + ChannelJobInfo cInfo = listener.getJobInfo(channel.getChannelId());; + Set ncs = new HashSet<>(cInfo.getLocations()); + AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint( + ncs.toArray(new String[ncs.size()])); + int partition = 0; + for (String location : locations.getLocations()) { + messageBroker.sendApplicationMessageToNC( + new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, "cc", + new ActiveRuntimeId(channel.getChannelId(), + RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition++)), + location); + } + eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_ENDED); + + //Drop the Channel Datasets + //TODO: Need to find some way to handle if this fails. + //TODO: Prevent datasets for Channels from being dropped elsewhere + DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse), + new Identifier(channel.getResultsDatasetName()), true); + ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc); + + dropStmt = new DropDatasetStatement(new Identifier(dataverse), + new Identifier(channel.getSubscriptionsDataset()), true); + ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc); + + if (subscriberRegistered) { + listener.deregisterEventSubscriber(eventSubscriber); + } + + //Remove the Channel Metadata + MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel); + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + } catch (Exception e) { + e.printStackTrace(); + if (txnActive) { + QueryTranslator.abort(e, e, mdTxnCtx); + } + throw new HyracksDataException(e); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java new file mode 100644 index 0000000..dc10742 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.lang.statement; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.algebra.extension.IExtensionStatement; +import org.apache.asterix.app.translator.QueryTranslator; +import org.apache.asterix.bad.BADConstants; +import org.apache.asterix.bad.lang.BADLangExtension; +import org.apache.asterix.bad.metadata.Broker; +import org.apache.asterix.bad.metadata.Channel; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.lang.aql.expression.FLWOGRExpression; +import org.apache.asterix.lang.common.base.Clause; +import org.apache.asterix.lang.common.base.Expression; +import org.apache.asterix.lang.common.clause.LetClause; +import org.apache.asterix.lang.common.expression.CallExpr; +import org.apache.asterix.lang.common.expression.FieldAccessor; +import org.apache.asterix.lang.common.expression.FieldBinding; +import org.apache.asterix.lang.common.expression.LiteralExpr; +import org.apache.asterix.lang.common.expression.RecordConstructor; +import org.apache.asterix.lang.common.expression.VariableExpr; +import org.apache.asterix.lang.common.literal.StringLiteral; +import org.apache.asterix.lang.common.statement.InsertStatement; +import org.apache.asterix.lang.common.statement.Query; +import org.apache.asterix.lang.common.statement.UpsertStatement; +import org.apache.asterix.lang.common.struct.Identifier; +import org.apache.asterix.lang.common.struct.VarIdentifier; +import org.apache.asterix.lang.common.visitor.base.ILangVisitor; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.om.functions.AsterixBuiltinFunctions; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; +import org.apache.asterix.translator.IStatementExecutor.Stats; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.dataset.IHyracksDataset; +import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class ChannelSubscribeStatement implements IExtensionStatement { + + private final Identifier dataverseName; + private final Identifier channelName; + private final Identifier brokerDataverseName; + private final Identifier brokerName; + private final List argList; + private final String subscriptionId; + private final int varCounter; + + public ChannelSubscribeStatement(Identifier dataverseName, Identifier channelName, List argList, + int varCounter, Identifier brokerDataverseName, Identifier brokerName, String subscriptionId) { + this.channelName = channelName; + this.dataverseName = dataverseName; + this.brokerDataverseName = brokerDataverseName; + this.brokerName = brokerName; + this.argList = argList; + this.subscriptionId = subscriptionId; + this.varCounter = varCounter; + } + + public Identifier getDataverseName() { + return dataverseName; + } + + public Identifier getBrokerDataverseName() { + return brokerDataverseName; + } + + public Identifier getChannelName() { + return channelName; + } + + public Identifier getBrokerName() { + return brokerName; + } + + public List getArgList() { + return argList; + } + + public int getVarCounter() { + return varCounter; + } + + public String getSubscriptionId() { + return subscriptionId; + } + + @Override + public byte getKind() { + return Kind.EXTENSION; + } + + @Override + public byte getCategory() { + return Category.QUERY; + } + + @Override + public R accept(ILangVisitor visitor, T arg) throws AsterixException { + return null; + } + + @Override + public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, + IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, + int resultSetIdCounter) throws HyracksDataException, AlgebricksException { + + String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName); + String brokerDataverse = ((QueryTranslator) statementExecutor) +.getActiveDataverse(brokerDataverseName); + + MetadataTransactionContext mdTxnCtx = null; + try { + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + + Channel channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue()); + if (channel == null) { + throw new AsterixException("There is no channel with this name " + channelName + "."); + } + Broker broker = BADLangExtension.getBroker(mdTxnCtx, brokerDataverse, brokerName.getValue()); + if (broker == null) { + throw new AsterixException("There is no broker with this name " + brokerName + "."); + } + + String subscriptionsDatasetName = channel.getSubscriptionsDataset(); + + if (argList.size() != channel.getFunction().getArity()) { + throw new AsterixException("Channel expected " + channel.getFunction().getArity() + + " parameters but got " + argList.size()); + } + + Query subscriptionTuple = new Query(false); + + List fb = new ArrayList(); + LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.DataverseName)); + Expression rightExpr = new LiteralExpr(new StringLiteral(brokerDataverse)); + fb.add(new FieldBinding(leftExpr, rightExpr)); + + leftExpr = new LiteralExpr(new StringLiteral(BADConstants.BrokerName)); + rightExpr = new LiteralExpr(new StringLiteral(broker.getBrokerName())); + fb.add(new FieldBinding(leftExpr, rightExpr)); + + if (subscriptionId != null) { + leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId)); + + List UUIDList = new ArrayList(); + UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId))); + FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR; + FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(), + function.getArity()); + CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList); + + rightExpr = UUIDCall; + fb.add(new FieldBinding(leftExpr, rightExpr)); + } + + for (int i = 0; i < argList.size(); i++) { + leftExpr = new LiteralExpr(new StringLiteral("param" + i)); + rightExpr = argList.get(i); + fb.add(new FieldBinding(leftExpr, rightExpr)); + } + RecordConstructor recordCon = new RecordConstructor(fb); + subscriptionTuple.setBody(recordCon); + + subscriptionTuple.setVarCounter(varCounter); + + if (subscriptionId == null) { + + VariableExpr subscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1)); + VariableExpr useSubscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1)); + VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0)); + VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result", 0)); + useResultVar.setIsNewVar(false); + useSubscriptionVar.setIsNewVar(false); + Query returnQuery = new Query(false); + List clauseList = new ArrayList<>(); + LetClause let = new LetClause(subscriptionVar, + new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId))); + clauseList.add(let); + FLWOGRExpression body = new FLWOGRExpression(clauseList, useSubscriptionVar); + returnQuery.setBody(body); + + metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); + metadataProvider.setResultAsyncMode( + resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.ASYNC_DEFERRED); + InsertStatement insert = new InsertStatement(new Identifier(dataverse), + new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, + returnQuery); + ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, insert, hcc, hdc, + resultDelivery, stats, false); + } else { + UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse), + new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null); + ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, upsert, hcc, hdc, + resultDelivery, stats, false); + } + + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + } catch (Exception e) { + QueryTranslator.abort(e, e, mdTxnCtx); + throw new HyracksDataException(e); + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java new file mode 100644 index 0000000..17a54ec --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.lang.statement; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.algebra.extension.IExtensionStatement; +import org.apache.asterix.app.translator.QueryTranslator; +import org.apache.asterix.bad.BADConstants; +import org.apache.asterix.bad.lang.BADLangExtension; +import org.apache.asterix.bad.metadata.Channel; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.lang.aql.visitor.AqlDeleteRewriteVisitor; +import org.apache.asterix.lang.common.base.Expression; +import org.apache.asterix.lang.common.expression.CallExpr; +import org.apache.asterix.lang.common.expression.FieldAccessor; +import org.apache.asterix.lang.common.expression.LiteralExpr; +import org.apache.asterix.lang.common.expression.OperatorExpr; +import org.apache.asterix.lang.common.expression.VariableExpr; +import org.apache.asterix.lang.common.literal.StringLiteral; +import org.apache.asterix.lang.common.statement.DeleteStatement; +import org.apache.asterix.lang.common.struct.Identifier; +import org.apache.asterix.lang.common.visitor.base.ILangVisitor; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.om.functions.AsterixBuiltinFunctions; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; +import org.apache.asterix.translator.IStatementExecutor.Stats; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.dataset.IHyracksDataset; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class ChannelUnsubscribeStatement implements IExtensionStatement { + + private final Identifier dataverseName; + private final Identifier channelName; + private final String subscriptionId; + private final int varCounter; + private VariableExpr vars; + private List dataverses; + private List datasets; + + public ChannelUnsubscribeStatement(VariableExpr vars, Identifier dataverseName, Identifier channelName, + String subscriptionId, int varCounter, List dataverses, List datasets) { + this.vars = vars; + this.channelName = channelName; + this.dataverseName = dataverseName; + this.subscriptionId = subscriptionId; + this.varCounter = varCounter; + this.dataverses = dataverses; + this.datasets = datasets; + } + + public Identifier getDataverseName() { + return dataverseName; + } + + public VariableExpr getVariableExpr() { + return vars; + } + + public Identifier getChannelName() { + return channelName; + } + + public String getsubScriptionId() { + return subscriptionId; + } + + public List getDataverses() { + return dataverses; + } + + public List getDatasets() { + return datasets; + } + + public int getVarCounter() { + return varCounter; + } + + @Override + public byte getKind() { + return Kind.EXTENSION; + } + + @Override + public byte getCategory() { + return Category.UPDATE; + } + + @Override + public R accept(ILangVisitor visitor, T arg) throws AsterixException { + return null; + } + + @Override + public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, + IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, + int resultSetIdCounter) throws HyracksDataException, AlgebricksException { + String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName); + + MetadataTransactionContext mdTxnCtx = null; + try { + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + + Channel channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue()); + if (channel == null) { + throw new AsterixException("There is no channel with this name " + channelName + "."); + } + + String subscriptionsDatasetName = channel.getSubscriptionsDataset(); + + //Need a condition to say subscription-id = sid + OperatorExpr condition = new OperatorExpr(); + FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.SubscriptionId)); + condition.addOperand(fa); + condition.setCurrentop(true); + condition.addOperator("="); + + List UUIDList = new ArrayList(); + UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId))); + + FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR; + FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(), + function.getArity()); + CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList); + + condition.addOperand(UUIDCall); + + DeleteStatement delete = new DeleteStatement(vars, new Identifier(dataverse), + new Identifier(subscriptionsDatasetName), condition, varCounter, dataverses, datasets); + AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor(); + delete.accept(visitor, null); + + ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc); + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + } catch (Exception e) { + QueryTranslator.abort(e, e, mdTxnCtx); + throw new HyracksDataException(e); + } + } +} \ No newline at end of file