asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steven Jacobs (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb-bad[master]: Updated to match code changes to asterix
Date Thu, 29 Sep 2016 21:47:19 GMT
Steven Jacobs has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1227

Change subject: Updated to match code changes to asterix
......................................................................

Updated to match code changes to asterix

Change-Id: I010b81776543e127f09f046a8601bb7184f7de9a
---
M src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
M src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
M src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
5 files changed, 40 insertions(+), 20 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/27/1227/1

diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 7d0cb1a..21a3ef0 100644
--- a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -29,16 +29,22 @@
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.aql.expression.FLWOGRExpression;
+import org.apache.asterix.lang.common.base.Clause;
 import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.clause.LetClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.FieldAccessor;
 import org.apache.asterix.lang.common.expression.FieldBinding;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.literal.StringLiteral;
 import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.statement.UpsertStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -182,18 +188,32 @@
             subscriptionTuple.setVarCounter(varCounter);
 
             if (subscriptionId == null) {
-                List<String> returnField = new ArrayList<>();
-                returnField.add(BADConstants.SubscriptionId);
+
+                VariableExpr subscriptionVar = new VariableExpr(new VarIdentifier("$sub",
1));
+                VariableExpr useSubscriptionVar = new VariableExpr(new VarIdentifier("$sub",
1));
+                VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
+                VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result",
0));
+                useResultVar.setIsNewVar(false);
+                useSubscriptionVar.setIsNewVar(false);
+                Query returnQuery = new Query(false);
+                List<Clause> clauseList = new ArrayList<>();
+                LetClause let = new LetClause(subscriptionVar,
+                        new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId)));
+                clauseList.add(let);
+                FLWOGRExpression body = new FLWOGRExpression(clauseList, useSubscriptionVar);
+                returnQuery.setBody(body);
+
                 metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
                 metadataProvider.setResultAsyncMode(
                         resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
                 InsertStatement insert = new InsertStatement(new Identifier(dataverse),
-                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter,
false, returnField);
+                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter,
resultVar,
+                        returnQuery);
                 ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
insert, hcc, hdc,
                         resultDelivery, stats, false);
             } else {
                 UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
-                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter);
+                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter,
null, null);
                 ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
upsert, hcc, hdc,
                         resultDelivery, stats, false);
             }
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 824e725..9c99a9f 100644
--- a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -67,7 +67,7 @@
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.om.base.temporal.ADurationParserFactory;
 import org.apache.asterix.runtime.util.AsterixAppContextInfo;
-import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.ClusterStateManager;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -199,7 +199,7 @@
         RepetitiveChannelOperatorDescriptor channelOp = new RepetitiveChannelOperatorDescriptor(jobSpec,
dataverse,
                 channelName, duration, channeljobSpec, strIP, port);
 
-        String partition = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations()[0];
+        String partition = ClusterStateManager.INSTANCE.getClusterLocations().getLocations()[0];
         Set<String> ncs = new HashSet<>(Arrays.asList(partition));
         AlgebricksAbsolutePartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
                 ncs.toArray(new String[ncs.size()]));
@@ -246,7 +246,7 @@
             IHyracksDataset hdc, Stats stats, String dataverse) throws Exception {
         StringBuilder builder = new StringBuilder();
         builder.append("insert into dataset " + dataverse + "." + resultsName + " ");
-        builder.append(" (" + " let $" + BADConstants.ChannelExecutionTime + " := current-datetime()
\n");
+        builder.append(" as $a (" + " let $" + BADConstants.ChannelExecutionTime + " := current-datetime()
\n");
 
         builder.append("for $sub in dataset " + dataverse + "." + subscriptionsName + "\n");
         builder.append(
@@ -266,7 +266,7 @@
         builder.append("\"result\":$result");
         builder.append("}");
         builder.append(")");
-        builder.append(" return records");
+        builder.append(" returning $a");
         builder.append(";");
         AQLParserFactory aqlFact = new AQLParserFactory();
         List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
diff --git a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
b/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 8e19fc0..374bae2 100644
--- a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -51,7 +51,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
@@ -75,10 +75,10 @@
             return false;
         }
         AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
+        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
             return false;
         }
-        ExtensionOperator eOp = (ExtensionOperator) op;
+        DelegateOperator eOp = (DelegateOperator) op;
         if (!(eOp.getDelegate() instanceof CommitOperator)) {
             return false;
         }
@@ -140,7 +140,7 @@
         context.computeAndSetTypeEnvironmentForOperator(badProject);
 
         //Create my brokerNotify plan above the extension Operator
-        ExtensionOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar,
channelExecutionVar,
+        DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar,
channelExecutionVar,
                 context, eOp, (DistributeResultOperator) op1, channelDataverse, channelName);
 
         opRef.setValue(dOp);
@@ -148,7 +148,7 @@
         return true;
     }
 
-    private ExtensionOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar,
+    private DelegateOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar,
             LogicalVariable subscriptionIdVar, LogicalVariable channelExecutionVar, IOptimizationContext
context,
             ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse,
String channelName)
                     throws AlgebricksException {
@@ -196,7 +196,7 @@
         EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse,
channelName);
         NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
         notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
-        ExtensionOperator extensionOp = new ExtensionOperator(notifyBrokerOp);
+        DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
         extensionOp.setPhysicalOperator(notifyBrokerPOp);
         extensionOp.getInputs().add(new MutableObject<ILogicalOperator>(groupbyOp));
 
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
index c680988..d281b49 100644
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
+++ b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -22,14 +22,14 @@
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 
 /**
  * A repetitive channel operator, which uses a Java timer to run a given query periodically
  */
-public class NotifyBrokerOperator extends AbstractExtensibleLogicalOperator {
+public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator {
     private final LogicalVariable subscriptionIdVar;
     private final LogicalVariable brokerEndpointVar;
     private final LogicalVariable channelExecutionVar;
@@ -64,7 +64,7 @@
     }
 
     @Override
-    public IOperatorExtension newInstance() {
+    public IOperatorDelegate newInstance() {
         return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar);
     }
 
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
index 753ece7..7d0e044 100644
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
+++ b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -73,7 +73,7 @@
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context,
ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema
outerPlanSchema)
                     throws AlgebricksException {
-        ExtensionOperator notify = (ExtensionOperator) op;
+        DelegateOperator notify = (DelegateOperator) op;
         LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable();
         LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
         LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1227
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I010b81776543e127f09f046a8601bb7184f7de9a
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sjaco002@ucr.edu>

Mime
View raw message