From commits-return-5352-archive-asf-public=cust-asf.ponee.io@asterixdb.apache.org Sat Apr 14 20:08:34 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0715E18064D for ; Sat, 14 Apr 2018 20:08:32 +0200 (CEST) Received: (qmail 65287 invoked by uid 500); 14 Apr 2018 18:08:32 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 65278 invoked by uid 99); 14 Apr 2018 18:08:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 14 Apr 2018 18:08:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 07BEFF6554; Sat, 14 Apr 2018 18:08:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjaco002@apache.org To: commits@asterixdb.apache.org Message-Id: <65b43ae5850a4a72816f62866800b3f9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: asterixdb-bad git commit: Add push-based channels and improve broker notifications Date: Sat, 14 Apr 2018 18:08:32 +0000 (UTC) Repository: asterixdb-bad Updated Branches: refs/heads/master 9e13d7255 -> 345b0f572 Add push-based channels and improve broker notifications Change-Id: Ie3c7cae0f015d6bc01dd912499565bb12c15abc3 Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/345b0f57 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/345b0f57 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/345b0f57 Branch: refs/heads/master Commit: 345b0f5729d3a6ed0564707ec25b56750c5366ec Parents: 9e13d72 Author: Steven Glenn Jacobs Authored: Thu Apr 12 13:19:50 2018 -0700 Committer: Steven Glenn Jacobs Committed: Thu Apr 12 13:19:50 2018 -0700 ---------------------------------------------------------------------- .../apache/asterix/bad/ChannelJobService.java | 92 +------- .../lang/statement/CreateChannelStatement.java | 46 +++- .../InsertBrokerNotifierForChannelRule.java | 232 ++++++++++++------- .../bad/runtime/NotifyBrokerOperator.java | 34 ++- .../bad/runtime/NotifyBrokerPOperator.java | 13 +- .../bad/runtime/NotifyBrokerRuntime.java | 141 +++++++++-- .../bad/runtime/NotifyBrokerRuntimeFactory.java | 18 +- .../src/main/resources/lang-extension/lang.txt | 7 +- .../queries/channel/channel-push.sqlpp | 85 +++++++ .../results/channel/channel-push.plan | 64 +++++ 10 files changed, 505 insertions(+), 227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java index 41853b9..3df9a76 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java @@ -18,51 +18,16 @@ */ package org.apache.asterix.bad; -import java.io.BufferedReader; -import java.io.DataOutputStream; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.om.base.AOrderedList; -import org.apache.asterix.om.base.AUUID; -import org.apache.hyracks.api.exceptions.HyracksDataException; /** - * Provides functionality for channel jobs and communicating with Brokers + * Provides functionality for channel jobs */ public class ChannelJobService { private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName()); - public static void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint, - AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException { - String formattedString; - formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime); - sendMessage(brokerEndpoint, formattedString); - } - - public static String formatJSON(EntityId activeJobId, AOrderedList subscriptionIds, String channelExecutionTime) { - String JSON = "{ \"dataverseName\":\"" + activeJobId.getDataverse() + "\", \"channelName\":\"" - + activeJobId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\"" - + channelExecutionTime + "\", \"subscriptionIds\":["; - for (int i = 0; i < subscriptionIds.size(); i++) { - AUUID subId = (AUUID) subscriptionIds.getItem(i); - String subscriptionString = subId.toString(); - //Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value - subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2); - JSON += "\"" + subscriptionString + "\""; - if (i < subscriptionIds.size() - 1) { - JSON += ","; - } - } - JSON += "]}"; - return JSON; - - } public static long findPeriod(String duration) { //TODO: Allow Repetitive Channels to use YMD durations @@ -92,61 +57,6 @@ public class ChannelJobService { return (long) (seconds * 1000); } - public static void sendMessage(String targetURL, String urlParameters) { - HttpURLConnection connection = null; - try { - //Create connection - URL url = new URL(targetURL); - connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("POST"); - connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); - - connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length)); - connection.setRequestProperty("Content-Language", "en-US"); - - connection.setUseCaches(false); - connection.setDoOutput(true); - connection.setConnectTimeout(500); - - if (connection.getOutputStream() != null) { - //Send message - DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); - wr.writeBytes(urlParameters); - wr.close(); - } else { - throw new Exception(); - } - - if (LOGGER.isLoggable(Level.INFO)) { - int responseCode = connection.getResponseCode(); - LOGGER.info("\nSending 'POST' request to URL : " + url); - LOGGER.info("Post parameters : " + urlParameters); - LOGGER.info("Response Code : " + responseCode); - } - - if (connection.getInputStream() != null) { - BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream())); - String inputLine; - StringBuffer response = new StringBuffer(); - while ((inputLine = in.readLine()) != null) { - response.append(inputLine); - } - in.close(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.log(Level.INFO, response.toString()); - } - } else { - LOGGER.log(Level.WARNING, "Channel Failed to get response from Broker."); - } - - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker."); - } finally { - if (connection != null) { - connection.disconnect(); - } - } - } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java index 161f093..87ac320 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java @@ -40,7 +40,6 @@ import org.apache.asterix.bad.lang.BADParserFactory; import org.apache.asterix.bad.metadata.Channel; import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener; import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType; -import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.dataflow.ICcApplicationContext; @@ -48,6 +47,7 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.lang.common.base.Expression; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.expression.CallExpr; @@ -59,6 +59,7 @@ import org.apache.asterix.lang.common.statement.DatasetDecl; import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl; import org.apache.asterix.lang.common.statement.InsertStatement; import org.apache.asterix.lang.common.statement.InternalDetailsDecl; +import org.apache.asterix.lang.common.statement.Query; import org.apache.asterix.lang.common.statement.SetStatement; import org.apache.asterix.lang.common.struct.Identifier; import org.apache.asterix.lang.common.visitor.base.ILangVisitor; @@ -92,14 +93,16 @@ public class CreateChannelStatement extends ExtensionStatement { private String subscriptionsTableName; private String resultsTableName; private String dataverse; + private final boolean push; public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function, - Expression period) { + Expression period, boolean push) { this.channelName = channelName; this.dataverseName = dataverseName; this.function = function; this.period = (CallExpr) period; this.duration = ""; + this.push = push; } public Identifier getDataverseName() { @@ -218,12 +221,37 @@ public class CreateChannelStatement extends ExtensionStatement { } + private JobSpecification compilePushChannel(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, + IHyracksClientConnection hcc, Query q) throws Exception { + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + boolean bActiveTxn = true; + metadataProvider.setMetadataTxnContext(mdTxnCtx); + JobSpecification jobSpec = null; + try { + jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null); + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + bActiveTxn = false; + } catch (Exception e) { + LOGGER.log(Level.INFO, e.getMessage(), e); + if (bActiveTxn) { + ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx); + } + throw e; + } finally { + metadataProvider.getLocks().unlock(); + } + return jobSpec; + } + private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) throws Exception { StringBuilder builder = new StringBuilder(); builder.append("SET inline_with \"false\";\n"); - builder.append("insert into " + dataverse + "." + resultsTableName); - builder.append(" as a (\n" + "with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n"); + if (!push) { + builder.append("insert into " + dataverse + "." + resultsTableName); + builder.append(" as a (\n"); + } + builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n"); builder.append("select result, "); builder.append(BADConstants.ChannelExecutionTime + ", "); builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ","); @@ -238,15 +266,19 @@ public class CreateChannelStatement extends ExtensionStatement { builder.append("sub.param" + i + ") result \n"); builder.append("where b." + BADConstants.BrokerName + " = sub." + BADConstants.BrokerName + "\n"); builder.append("and b." + BADConstants.DataverseName + " = sub." + BADConstants.DataverseName + "\n"); - builder.append(")"); - builder.append(" returning a"); + if (!push) { + builder.append(")"); + builder.append(" returning a"); + } builder.append(";"); BADParserFactory factory = new BADParserFactory(); List fStatements = factory.createParser(new StringReader(builder.toString())).parse(); SetStatement ss = (SetStatement) fStatements.get(0); metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue()); - + if (push) { + return compilePushChannel(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(1)); + } return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null); } http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java index d83b606..9ead7f0 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java @@ -31,6 +31,7 @@ import org.apache.asterix.metadata.declared.DatasetDataSource; import org.apache.asterix.om.base.AString; import org.apache.asterix.om.constants.AsterixConstantValue; import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.types.IAType; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -43,6 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; @@ -74,133 +76,197 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) { return false; } + boolean push = false; + AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue(); if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) { - return false; + if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) { + return false; + } + push = true; } - DelegateOperator eOp = (DelegateOperator) op; - if (!(eOp.getDelegate() instanceof CommitOperator)) { - return false; + DataSourceScanOperator subscriptionsScan; + String channelDataverse; + String channelName; + + if (!push) { + DelegateOperator eOp = (DelegateOperator) op; + if (!(eOp.getDelegate() instanceof CommitOperator)) { + return false; + } + AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue(); + if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) { + return false; + } + InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp; + if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) { + return false; + } + DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource(); + String datasetName = dds.getDataset().getDatasetName(); + if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata") + || !dds.getDataset().getItemTypeName().equals("ChannelResultsType") + || !datasetName.endsWith("Results")) { + return false; + } + channelDataverse = dds.getDataset().getDataverseName(); + //Now we know that we are inserting into results + + channelName = datasetName.substring(0, datasetName.length() - 7); + String subscriptionsName = channelName + "Subscriptions"; + subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName); + if (subscriptionsScan == null) { + return false; + } + + } else { + //if push, get the channel name here instead + subscriptionsScan = (DataSourceScanOperator) findOp(op, ""); + if (subscriptionsScan == null) { + return false; + } + DatasetDataSource dds = (DatasetDataSource) subscriptionsScan.getDataSource(); + String datasetName = dds.getDataset().getDatasetName(); + channelDataverse = dds.getDataset().getDataverseName(); + channelName = datasetName.substring(0, datasetName.length() - 13); } - AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue(); - if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) { + + //Now we need to get the broker EndPoint + LogicalVariable brokerEndpointVar = context.newVar(); + AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers"); + if (opAboveBrokersScan == null) { return false; } - InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp; - if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) { + + //get subscriptionIdVar + LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0); + + //The channelExecutionTime is created just before the scan + ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue(); + if (channelExecutionAssign.getOperatorTag() != LogicalOperatorTag.ASSIGN) { return false; } - DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource(); - String datasetName = dds.getDataset().getDatasetName(); - if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata") - || !dds.getDataset().getItemTypeName().equals("ChannelResultsType") - || !datasetName.endsWith("Results")) { + LogicalVariable channelExecutionVar = ((AssignOperator) channelExecutionAssign).getVariables().get(0); + if (!channelExecutionVar.toString().equals("$$" + BADConstants.ChannelExecutionTime)) { return false; } - String channelDataverse = dds.getDataset().getDataverseName(); - //Now we know that we are inserting into results - String channelName = datasetName.substring(0, datasetName.length() - 7); - String subscriptionsName = channelName + "Subscriptions"; - //TODO: Can we check here to see if there is a channel with such a name? - - DataSourceScanOperator subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName); - if (subscriptionsScan == null) { - return false; + if (!push) { + ((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false); } - //Now we want to make sure and set the commit to be a nonsink commit - ((CommitOperator) eOp.getDelegate()).setSink(false); - - //Now we need to get the broker EndPoint - LogicalVariable brokerEndpointVar = context.newVar(); - AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers"); AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan); //now brokerNameVar holds the brokerName for use farther up in the plan context.computeAndSetTypeEnvironmentForOperator(assignOp); context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan); - context.computeAndSetTypeEnvironmentForOperator(eOp); - - //get subscriptionIdVar - LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0); + context.computeAndSetTypeEnvironmentForOperator(op); - //The channelExecutionTime is created just before the scan - LogicalVariable channelExecutionVar = ((AssignOperator) subscriptionsScan.getInputs().get(0).getValue()) - .getVariables().get(0); - - ProjectOperator badProject = (ProjectOperator) findOp(op, "project"); + ProjectOperator badProject = (ProjectOperator) findOp(op1, "project"); badProject.getVariables().add(subscriptionIdVar); badProject.getVariables().add(brokerEndpointVar); badProject.getVariables().add(channelExecutionVar); context.computeAndSetTypeEnvironmentForOperator(badProject); + //Create my brokerNotify plan above the extension Operator - DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, - context, eOp, (DistributeResultOperator) op1, channelDataverse, channelName); + DelegateOperator dOp = push + ? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar, + context, op, (DistributeResultOperator) op1, channelDataverse, channelName) + : createNotifyBrokerPullPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, context, op, + (DistributeResultOperator) op1, channelDataverse, channelName); opRef.setValue(dOp); return true; } - private DelegateOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar, - LogicalVariable subscriptionIdVar, LogicalVariable channelExecutionVar, IOptimizationContext context, + private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar, + LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push, + IAType resultType) { + NotifyBrokerOperator notifyBrokerOp = + new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, push, resultType); + EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName); + NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId); + notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp); + DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp); + extensionOp.setPhysicalOperator(notifyBrokerPOp); + return extensionOp; + } + + private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable brokerEndpointVar, LogicalVariable sendVar, + LogicalVariable channelExecutionVar, IOptimizationContext context, ILogicalOperator eOp, + DistributeResultOperator distributeOp, String channelDataverse, String channelName) + throws AlgebricksException { + //Find the assign operator to get the result type that we need + AbstractLogicalOperator assign = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue(); + while (assign.getOperatorTag() != LogicalOperatorTag.ASSIGN) { + assign = (AbstractLogicalOperator) assign.getInputs().get(0).getValue(); + } + IVariableTypeEnvironment env = assign.computeOutputTypeEnvironment(context); + IAType resultType = (IAType) env.getVarType(sendVar); + + //Create the NotifyBrokerOperator + DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendVar, channelExecutionVar, channelDataverse, + channelName, true, resultType); + + extensionOp.getInputs().add(new MutableObject<>(eOp)); + context.computeAndSetTypeEnvironmentForOperator(extensionOp); + + return extensionOp; + + } + + private DelegateOperator createNotifyBrokerPullPlan(LogicalVariable brokerEndpointVar, + LogicalVariable sendVar, LogicalVariable channelExecutionVar, IOptimizationContext context, ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName) throws AlgebricksException { - //create the Distinct Op - ArrayList> expressions = new ArrayList>(); - VariableReferenceExpression vExpr = new VariableReferenceExpression(subscriptionIdVar); - expressions.add(new MutableObject(vExpr)); + + //Create the Distinct Op + ArrayList> expressions = new ArrayList<>(); + VariableReferenceExpression vExpr = new VariableReferenceExpression(sendVar); + expressions.add(new MutableObject<>(vExpr)); DistinctOperator distinctOp = new DistinctOperator(expressions); - //create the GroupBy Op - //And set the distinct as input - List>> groupByList = new ArrayList>>(); - List>> groupByDecorList = new ArrayList>>(); - List nestedPlans = new ArrayList(); - //create group by operator + List>> groupByList = new ArrayList<>(); + List>> groupByDecorList = new ArrayList<>(); + List nestedPlans = new ArrayList<>(); + + //Create GroupBy operator GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans); groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar)); groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar)); - groupbyOp.getInputs().add(new MutableObject(distinctOp)); + + //Set the distinct as input + groupbyOp.getInputs().add(new MutableObject<>(distinctOp)); //create nested plan for subscription ids in group by - NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator( - new MutableObject(groupbyOp)); - //TODO: This is from translationcontext. It might be needed to make the variable exist outside of the subplan - //LogicalVariable subscriptionListVar = context.newSubplanOutputVar(); - LogicalVariable subscriptionListVar = context.newVar(); - List aggVars = new ArrayList(); - aggVars.add(subscriptionListVar); - AggregateFunctionCallExpression funAgg = BuiltinFunctions.makeAggregateFunctionExpression( - BuiltinFunctions.LISTIFY, new ArrayList>()); - funAgg.getArguments() - .add(new MutableObject(new VariableReferenceExpression(subscriptionIdVar))); - List> aggExpressions = new ArrayList>(); - aggExpressions.add(new MutableObject(funAgg)); + NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator(new MutableObject<>(groupbyOp)); + LogicalVariable sendListVar = context.newVar(); + List aggVars = new ArrayList<>(); + aggVars.add(sendListVar); + AggregateFunctionCallExpression funAgg = + BuiltinFunctions.makeAggregateFunctionExpression(BuiltinFunctions.LISTIFY, new ArrayList<>()); + funAgg.getArguments().add(new MutableObject<>(new VariableReferenceExpression(sendVar))); + List> aggExpressions = new ArrayList<>(); + aggExpressions.add(new MutableObject<>(funAgg)); AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions); - listifyOp.getInputs().add(new MutableObject(nestedTupleSourceOp)); + listifyOp.getInputs().add(new MutableObject<>(nestedTupleSourceOp)); //add nested plans - nestedPlans.add(new ALogicalPlanImpl(new MutableObject(listifyOp))); + nestedPlans.add(new ALogicalPlanImpl(new MutableObject<>(listifyOp))); + //Create the NotifyBrokerOperator - NotifyBrokerOperator notifyBrokerOp = new NotifyBrokerOperator(brokerEndpointVar, subscriptionListVar, - channelExecutionVar); - EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName); - NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId); - notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp); - DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp); - extensionOp.setPhysicalOperator(notifyBrokerPOp); - extensionOp.getInputs().add(new MutableObject(groupbyOp)); + DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendListVar, channelExecutionVar, + channelDataverse, channelName, false, null); - //Set the input for the brokerNotify as the replicate operator - distinctOp.getInputs().add(new MutableObject(eOp)); + //Set the input for the distinct as the old top + extensionOp.getInputs().add(new MutableObject<>(groupbyOp)); + distinctOp.getInputs().add(new MutableObject<>(eOp)); //compute environment bottom up - context.computeAndSetTypeEnvironmentForOperator(distinctOp); context.computeAndSetTypeEnvironmentForOperator(groupbyOp); context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp); @@ -211,7 +277,6 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule } - @SuppressWarnings("unchecked") private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar, AbstractLogicalOperator opAboveBrokersScan) { Mutable fieldRef = new MutableObject( @@ -244,9 +309,10 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule return assignOp; } - /*This function searches for the needed op - * If lookingForBrokers, find the op above the brokers scan - * Else find the suscbriptionsScan + /*This function is used to find specific operators within the plan, either + * A. The brokers dataset scan + * B. The subscriptions scan + * C. The highest project of the plan */ private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) { if (!op.hasInputs()) { @@ -311,7 +377,7 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource(); if (dds.getDataset().getItemTypeDataverseName().equals("Metadata") && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) { - if (dds.getDataset().getDatasetName().equals(subscriptionsName)) { + if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) { return true; } } http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java index d281b49..df0f0f4 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java @@ -20,6 +20,7 @@ package org.apache.asterix.bad.runtime; import java.util.Collection; +import org.apache.asterix.om.types.IAType; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator; @@ -27,22 +28,26 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDel import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; /** - * A repetitive channel operator, which uses a Java timer to run a given query periodically + * An operator for sending broker notifications */ public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator { - private final LogicalVariable subscriptionIdVar; private final LogicalVariable brokerEndpointVar; private final LogicalVariable channelExecutionVar; + private final LogicalVariable pushListVar; + private final boolean push; + private final IAType recordType; - public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable subscriptionIdVar, - LogicalVariable resultSetVar) { + public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable pushListVar, + LogicalVariable resultSetVar, boolean push, IAType recordType) { this.brokerEndpointVar = brokerEndpointVar; - this.subscriptionIdVar = subscriptionIdVar; this.channelExecutionVar = resultSetVar; + this.pushListVar = pushListVar; + this.push = push; + this.recordType = recordType; } - public LogicalVariable getSubscriptionVariable() { - return subscriptionIdVar; + public LogicalVariable getPushListVar() { + return pushListVar; } public LogicalVariable getBrokerEndpointVariable() { @@ -53,9 +58,18 @@ public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator { return channelExecutionVar; } + public IAType getRecordType() { + return recordType; + } + + public boolean getPush() { + return push; + } + @Override public String toString() { - return "notify-brokers"; + return "notify-brokers (" + brokerEndpointVar.toString() + "," + channelExecutionVar.toString() + "," + + pushListVar.toString() + ")"; } @Override @@ -65,7 +79,7 @@ public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator { @Override public IOperatorDelegate newInstance() { - return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar); + return new NotifyBrokerOperator(brokerEndpointVar, pushListVar, channelExecutionVar, push, recordType); } @Override @@ -76,7 +90,7 @@ public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator { @Override public void getUsedVariables(Collection usedVars) { - usedVars.add(subscriptionIdVar); + usedVars.add(pushListVar); usedVars.add(brokerEndpointVar); usedVars.add(channelExecutionVar); } http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java index 12d5ae2..b9cfbfd 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java @@ -20,6 +20,7 @@ package org.apache.asterix.bad.runtime; import org.apache.asterix.active.EntityId; +import org.apache.asterix.om.types.IAType; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; @@ -74,20 +75,22 @@ public class NotifyBrokerPOperator extends AbstractPhysicalOperator { IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { DelegateOperator notify = (DelegateOperator) op; - LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable(); + LogicalVariable pushListVar = ((NotifyBrokerOperator) notify.getDelegate()).getPushListVar(); LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable(); LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable(); + IAType recordType = ((NotifyBrokerOperator) notify.getDelegate()).getRecordType(); + boolean push = ((NotifyBrokerOperator) notify.getDelegate()).getPush(); int brokerColumn = inputSchemas[0].findVariable(brokerVar); - int subColumn = inputSchemas[0].findVariable(subVar); + int pushColumn = inputSchemas[0].findVariable(pushListVar); int executionColumn = inputSchemas[0].findVariable(executionVar); IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn); - IScalarEvaluatorFactory subEvalFactory = new ColumnAccessEvalFactory(subColumn); + IScalarEvaluatorFactory pushListEvalFactory = new ColumnAccessEvalFactory(pushColumn); IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn); - NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, subEvalFactory, - channelExecutionEvalFactory, entityId); + NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, pushListEvalFactory, + channelExecutionEvalFactory, entityId, push, recordType); RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java index 5d51926..6ffb244 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java @@ -20,21 +20,33 @@ package org.apache.asterix.bad.runtime; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.active.ActiveManager; import org.apache.asterix.active.EntityId; -import org.apache.asterix.bad.ChannelJobService; +import org.apache.asterix.bad.BADConstants; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer; +import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer; import org.apache.asterix.om.base.ADateTime; import org.apache.asterix.om.base.AOrderedList; -import org.apache.asterix.om.base.AString; +import org.apache.asterix.om.base.ARecord; +import org.apache.asterix.om.base.AUUID; import org.apache.asterix.om.types.AOrderedListType; +import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; @@ -48,11 +60,13 @@ import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { + private static final Logger LOGGER = Logger.getLogger(NotifyBrokerRuntime.class.getName()); private final ByteBufferInputStream bbis = new ByteBufferInputStream(); private final DataInputStream di = new DataInputStream(bbis); private final AOrderedListSerializerDeserializer subSerDes = new AOrderedListSerializerDeserializer(new AOrderedListType(BuiltinType.AUUID, null)); + private final ARecordSerializerDeserializer recordSerDes; private IPointable inputArg0 = new VoidPointable(); private IPointable inputArg1 = new VoidPointable(); @@ -62,17 +76,29 @@ public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRu private IScalarEvaluator eval2; private final ActiveManager activeManager; private final EntityId entityId; + private final boolean push; + private AOrderedList pushList; + private ARecord pushRecord; + private final IAType recordType; + private final Map> sendData = new HashMap<>(); + private String executionTimeString; public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory, - IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory, - EntityId activeJobId) throws HyracksDataException { + IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory, + EntityId activeJobId, boolean push, IAType recordType) throws HyracksDataException { this.tRef = new FrameTupleReference(); eval0 = brokerEvalFactory.createScalarEvaluator(ctx); - eval1 = subEvalFactory.createScalarEvaluator(ctx); + eval1 = pushListEvalFactory.createScalarEvaluator(ctx); eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx); this.activeManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext() .getApplicationContext()).getActiveManager(); this.entityId = activeJobId; + this.push = push; + this.pushList = null; + this.pushRecord = null; + this.recordType = recordType; + recordSerDes = new ARecordSerializerDeserializer((ARecordType) recordType); + executionTimeString = null; } @Override @@ -80,6 +106,61 @@ public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRu return; } + private void addSubscriptions(String endpoint, AOrderedList subscriptionIds) { + for (int i = 0; i < subscriptionIds.size(); i++) { + AUUID subId = (AUUID) subscriptionIds.getItem(i); + String subscriptionString = subId.toString(); + //Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value + subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2); + subscriptionString = "\"" + subscriptionString + "\""; + sendData.get(endpoint).add(subscriptionString); + } + } + + public String createData(String endpoint) { + String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\"" + + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\"" + + executionTimeString + "\", \"subscriptionIds\":["; + for (String value : sendData.get(endpoint)) { + JSON += value; + JSON += ","; + } + JSON = JSON.substring(0, JSON.length() - 1); + JSON += "]}"; + return JSON; + + } + + private void sendGroupOfResults(String endpoint) { + String urlParameters = createData(endpoint); + try { + //Create connection + URL url = new URL(endpoint); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); + + connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length)); + connection.setRequestProperty("Content-Language", "en-US"); + + connection.setUseCaches(false); + connection.setDoOutput(true); + connection.setConnectTimeout(500); + DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); + wr.writeBytes(urlParameters); + if (LOGGER.isLoggable(Level.INFO)) { + int responseCode = connection.getResponseCode(); + LOGGER.info("\nSending 'POST' request to URL : " + url); + LOGGER.info("Post parameters : " + urlParameters); + LOGGER.info("Response Code : " + responseCode); + } + wr.close(); + connection.disconnect(); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker."); + } + } + @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { tAccess.reset(buffer); @@ -91,33 +172,47 @@ public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRu eval1.evaluate(tRef, inputArg1); eval2.evaluate(tRef, inputArg2); + if (executionTimeString == null) { + int resultSetOffset = inputArg2.getStartOffset(); + bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1); + ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di); + try { + executionTimeString = executionTime.toSimpleString(); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + int serBrokerOffset = inputArg0.getStartOffset(); bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1); - AString endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di); - - int serSubOffset = inputArg1.getStartOffset(); - bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1); - AOrderedList subs = subSerDes.deserialize(di); - - int resultSetOffset = inputArg2.getStartOffset(); - bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1); - ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di); - String executionTimeString; - try { - executionTimeString = executionTime.toSimpleString(); - } catch (IOException e) { - throw HyracksDataException.create(e); + String endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue(); + sendData.putIfAbsent(endpoint, new HashSet<>()); + + if (push) { + int pushOffset = inputArg1.getStartOffset(); + bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1); + //TODO: Right now this creates an object per channel result. Need to find a better way to deserialize + pushRecord = recordSerDes.deserialize(di); + sendData.get(endpoint).add(pushRecord.toString()); + + } else { + int serSubOffset = inputArg1.getStartOffset(); + bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1); + pushList = subSerDes.deserialize(di); + addSubscriptions(endpoint, pushList); } - - ChannelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs, - executionTimeString); - } } @Override public void close() throws HyracksDataException { + for (String endpoint : sendData.keySet()) { + if (sendData.get(endpoint).size() > 0) { + sendGroupOfResults(endpoint); + sendData.get(endpoint).clear(); + } + } return; } http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java index 0e2be8b..a7f12ba 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java @@ -20,6 +20,7 @@ package org.apache.asterix.bad.runtime; import org.apache.asterix.active.EntityId; +import org.apache.asterix.om.types.IAType; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; @@ -31,16 +32,21 @@ public class NotifyBrokerRuntimeFactory implements IPushRuntimeFactory { private static final long serialVersionUID = 1L; private final IScalarEvaluatorFactory brokerEvalFactory; - private final IScalarEvaluatorFactory subEvalFactory; + private final IScalarEvaluatorFactory pushListEvalFactory; private final IScalarEvaluatorFactory channelExecutionEvalFactory; private final EntityId entityId; + private final boolean push; + private final IAType recordType; - public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory, IScalarEvaluatorFactory subEvalFactory, - IScalarEvaluatorFactory channelExecutionEvalFactory, EntityId entityId) { + public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory, + IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory, + EntityId entityId, boolean push, IAType recordType) { this.brokerEvalFactory = brokerEvalFactory; - this.subEvalFactory = subEvalFactory; + this.pushListEvalFactory = pushListEvalFactory; this.channelExecutionEvalFactory = channelExecutionEvalFactory; this.entityId = entityId; + this.push = push; + this.recordType = recordType; } @Override @@ -50,7 +56,7 @@ public class NotifyBrokerRuntimeFactory implements IPushRuntimeFactory { @Override public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { - return new IPushRuntime[] { new NotifyBrokerRuntime(ctx, brokerEvalFactory, subEvalFactory, - channelExecutionEvalFactory, entityId) }; + return new IPushRuntime[] { new NotifyBrokerRuntime(ctx, brokerEvalFactory, pushListEvalFactory, + channelExecutionEvalFactory, entityId, push, recordType) }; } } http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/resources/lang-extension/lang.txt ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt index 02aba78..2d7ba75 100644 --- a/asterix-bad/src/main/resources/lang-extension/lang.txt +++ b/asterix-bad/src/main/resources/lang-extension/lang.txt @@ -101,15 +101,18 @@ CreateChannelStatement ChannelSpecification() throws ParseException: CreateChannelStatement ccs = null; String fqFunctionName = null; Expression period = null; + boolean push = false; } { ( - "repetitive" "channel" nameComponents = QualifiedName() + "repetitive" + ( "push" { push = true; } )? + "channel" nameComponents = QualifiedName() appliedFunction = FunctionSignature() "period" period = FunctionCallExpr() { ccs = new CreateChannelStatement(nameComponents.first, - nameComponents.second, appliedFunction, period); + nameComponents.second, appliedFunction, period, push); } ) { http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp new file mode 100644 index 0000000..e20638b --- /dev/null +++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Check a push-based channel plan + * Expected Res : Success + * Date : Mar 2018 + */ + +drop dataverse channels7 if exists; +create dataverse channels7; +use channels7; + +create type UserLocation as { + location: circle, + userName: string, + timeStamp: datetime +}; + + +create type UserLocationFeedType as { + location: circle, + userName: string +}; + +create type EmergencyReport as { + reportId: uuid, + Etype: string, + location: circle, + timeStamp: datetime +}; + +create type EmergencyReportFeedType as { + Etype: string, + location: circle +}; + + +create type EmergencyShelter as { + shelterName: string, + location: point +}; + +create dataset UserLocations(UserLocation) +primary key userName; +create dataset Shelters(EmergencyShelter) +primary key shelterName; +create dataset Reports(EmergencyReport) +primary key reportId autogenerated; + +create index location_time on UserLocations(timeStamp); +create index u_location on UserLocations(location) type RTREE; +create index s_location on Shelters(location) type RTREE; +create index report_time on Reports(timeStamp); + +create function RecentEmergenciesNearUser(userName) { + ( + select report, shelters from + ( select value r from Reports r where r.timeStamp > + current_datetime() - day_time_duration("PT10S"))report, + UserLocations u + let shelters = (select s.location from Shelters s where spatial_intersect(s.location,u.location)) + where u.userName = userName + and spatial_intersect(report.location,u.location) + ) +}; + +write output to nc1:"rttest/channel-push.sqlpp"; + +create repetitive push channel EmergencyChannel using RecentEmergenciesNearUser@1 period duration("PT10S"); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan new file mode 100644 index 0000000..770617f --- /dev/null +++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan @@ -0,0 +1,64 @@ +-- NOTIFY_BROKERS |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- STREAM_SELECT |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- HYBRID_HASH_JOIN [$$128][$$135] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$128] |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- HYBRID_HASH_JOIN [$$126, $$124][$$117, $$118] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$126, $$124] |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- DATASOURCE_SCAN |PARTITIONED| + -- BROADCAST_EXCHANGE |PARTITIONED| + -- ASSIGN |UNPARTITIONED| + -- EMPTY_TUPLE_SOURCE |UNPARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$117, $$118] |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- DATASOURCE_SCAN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- EMPTY_TUPLE_SOURCE |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$135] |PARTITIONED| + -- NESTED_LOOP |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STREAM_SELECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- DATASOURCE_SCAN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- EMPTY_TUPLE_SOURCE |PARTITIONED| + -- BROADCAST_EXCHANGE |PARTITIONED| + -- PRE_CLUSTERED_GROUP_BY[$$120] |PARTITIONED| + { + -- AGGREGATE |LOCAL| + -- STREAM_SELECT |LOCAL| + -- NESTED_TUPLE_SOURCE |LOCAL| + } + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STABLE_SORT [$$120(ASC)] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$120] |PARTITIONED| + -- NESTED_LOOP |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- DATASOURCE_SCAN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- EMPTY_TUPLE_SOURCE |PARTITIONED| + -- BROADCAST_EXCHANGE |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- DATASOURCE_SCAN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- EMPTY_TUPLE_SOURCE |PARTITIONED| \ No newline at end of file