asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steven Jacobs (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb-bad[master]: Added long-term test to verify pre-distributed jobs fix
Date Wed, 05 Apr 2017 00:54:11 GMT
Steven Jacobs has submitted this change and it was merged.

Change subject: Added long-term test to verify pre-distributed jobs fix
......................................................................


Added long-term test to verify pre-distributed jobs fix

Fixed bug from master change to DeleteStatement
Fixed Lock Management in BAD

Change-Id: I99e799e203f6ca6082f9c90f04e606c436eb00ee
---
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
M asterix-bad/src/main/resources/lang-extension/lang.txt
A asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.aql
A asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.2.update.aql
A asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.3.update.aql
A asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.aql
A asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.aql
A asterix-bad/src/test/resources/runtimets/results/channel/ten_minute_channel/ten_minute_channel.5.adm
M asterix-bad/src/test/resources/runtimets/testsuite.xml
18 files changed, 281 insertions(+), 53 deletions(-)

Approvals:
  Xikui Wang: Looks good to me, approved
  Jenkins: Verified



diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 9be02f6..5cc6444 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -59,21 +59,27 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
         List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
+        MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                metadataProvider.getStorageComponentProvider());
+        tempMdProvider.setConfig(metadataProvider.getConfig());
         for (Broker broker : brokers) {
+            tempMdProvider.getLocks().reset();
             BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()),
false);
-            drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+            drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
         }
         List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue());
         for (Channel channel : channels) {
+            tempMdProvider.getLocks().reset();
             ChannelDropStatement drop = new ChannelDropStatement(dvId,
                     new Identifier(channel.getChannelId().getEntityName()), false);
-            drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+            drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
         }
         List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx, dvId.getValue());
         for (Procedure procedure : procedures) {
+            tempMdProvider.getLocks().reset();
             ProcedureDropStatement drop = new ProcedureDropStatement(new FunctionSignature(dvId.getValue(),
                     procedure.getEntityId().getEntityName(), procedure.getArity()), false);
-            drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+            drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
         }
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
index 018e211..5b81903 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
@@ -94,6 +94,8 @@
         } catch (Exception e) {
             QueryTranslator.abort(e, e, mdTxnCtx);
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 89b0e9a..854ae07 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -116,16 +116,20 @@
                 hcc.destroyJob(hyracksJobId);
             }
 
+            //Create a metadata provider to use in nested jobs.
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                    metadataProvider.getStorageComponentProvider());
+            tempMdProvider.setConfig(metadataProvider.getConfig());
             //Drop the Channel Datasets
             //TODO: Need to find some way to handle if this fails.
             //TODO: Prevent datasets for Channels from being dropped elsewhere
             DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse),
                     new Identifier(channel.getResultsDatasetName()), true);
-            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider,
dropStmt, hcc);
-
+            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider,
dropStmt, hcc);
+            tempMdProvider.getLocks().reset();
             dropStmt = new DropDatasetStatement(new Identifier(dataverse),
                     new Identifier(channel.getSubscriptionsDataset()), true);
-            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider,
dropStmt, hcc);
+            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider,
dropStmt, hcc);
 
 
             //Remove the Channel Metadata
@@ -137,6 +141,8 @@
                 QueryTranslator.abort(e, e, mdTxnCtx);
             }
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index bb3cef2..5c5cdf1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -189,8 +189,12 @@
 
             subscriptionTuple.setVarCounter(varCounter);
 
-            if (subscriptionId == null) {
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                    metadataProvider.getStorageComponentProvider());
+            tempMdProvider.setConfig(metadataProvider.getConfig());
 
+            if (subscriptionId == null) {
+                //To create a new subscription
                 VariableExpr subscriptionVar = new VariableExpr(new VarIdentifier("$sub",
1));
                 VariableExpr useSubscriptionVar = new VariableExpr(new VarIdentifier("$sub",
1));
                 VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
@@ -204,17 +208,25 @@
                 FLWOGRExpression body = new FLWOGRExpression(clauseList, useSubscriptionVar);
 
                 metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
-                metadataProvider.setResultAsyncMode(
-                        resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
+                boolean resultsAsync =
+                        resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED;
+                metadataProvider.setResultAsyncMode(resultsAsync);
+                tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
+                tempMdProvider.setResultAsyncMode(resultsAsync);
+                tempMdProvider.setWriterFactory(metadataProvider.getWriterFactory());
+                tempMdProvider
+                        .setResultSerializerFactoryProvider(metadataProvider.getResultSerializerFactoryProvider());
+                tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
+
                 InsertStatement insert = new InsertStatement(new Identifier(dataverse),
-                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter,
resultVar,
- body);
-                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
insert, hcc, hdc,
+                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter,
resultVar, body);
+                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider,
insert, hcc, hdc,
                         resultDelivery, stats, false, null, null);
             } else {
+                //To update an existing subscription
                 UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
                         new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter,
null, null);
-                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
upsert, hcc, hdc,
+                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider,
upsert, hcc, hdc,
                         resultDelivery, stats, false, null, null);
             }
 
@@ -222,6 +234,8 @@
         } catch (Exception e) {
             QueryTranslator.abort(e, e, mdTxnCtx);
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
 
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 1558508..538a5ea 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -60,18 +60,14 @@
     private final String subscriptionId;
     private final int varCounter;
     private VariableExpr vars;
-    private List<String> dataverses;
-    private List<String> datasets;
 
     public ChannelUnsubscribeStatement(VariableExpr vars, Identifier dataverseName, Identifier
channelName,
-            String subscriptionId, int varCounter, List<String> dataverses, List<String>
datasets) {
+            String subscriptionId, int varCounter) {
         this.vars = vars;
         this.channelName = channelName;
         this.dataverseName = dataverseName;
         this.subscriptionId = subscriptionId;
         this.varCounter = varCounter;
-        this.dataverses = dataverses;
-        this.datasets = datasets;
     }
 
     public Identifier getDataverseName() {
@@ -88,14 +84,6 @@
 
     public String getsubScriptionId() {
         return subscriptionId;
-    }
-
-    public List<String> getDataverses() {
-        return dataverses;
-    }
-
-    public List<String> getDatasets() {
-        return datasets;
     }
 
     public int getVarCounter() {
@@ -152,15 +140,19 @@
             condition.addOperand(UUIDCall);
 
             DeleteStatement delete = new DeleteStatement(vars, new Identifier(dataverse),
-                    new Identifier(subscriptionsDatasetName), condition, varCounter, dataverses,
datasets);
+                    new Identifier(subscriptionsDatasetName), condition, varCounter);
             AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
             delete.accept(visitor, null);
-
-            ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
delete, hcc, false);
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                    metadataProvider.getStorageComponentProvider());
+            tempMdProvider.setConfig(metadataProvider.getConfig());
+            ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete,
hcc, false);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             QueryTranslator.abort(e, e, mdTxnCtx);
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 }
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
index 2c60a9d..b4f3eae 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
@@ -101,6 +101,8 @@
             }
             LOGGER.log(Level.WARNING, "Failed creating a broker", e);
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 }
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index a5e9c7c..f138d4f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -197,6 +197,7 @@
         //Run both statements to create datasets
         ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
createSubscriptionsDataset,
                 hcc);
+        metadataProvider.getLocks().reset();
         ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
createResultsDataset, hcc);
 
     }
@@ -299,14 +300,16 @@
             if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue())
!= null) {
                 throw new AsterixException("The channel name:" + channelName + " is not available.");
             }
-
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                    metadataProvider.getStorageComponentProvider());
+            tempMdProvider.setConfig(metadataProvider.getConfig());
             //Create Channel Datasets
-            createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider,
hcc, hdc, stats,
+            createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider,
hcc, hdc, stats,
                     dataverse);
-
+            tempMdProvider.getLocks().reset();
             //Create Channel Internal Job
             JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName,
resultsName,
-                    metadataProvider, hcc, hdc, stats, dataverse);
+                    tempMdProvider, hcc, hdc, stats, dataverse);
 
             // Now we subscribe
             if (listener == null) {
@@ -332,6 +335,8 @@
             }
             LOGGER.log(Level.WARNING, "Failed creating a channel", e);
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
 
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index 7a1bc54..1db9b26 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -140,10 +140,29 @@
         durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
     }
 
+    private JobSpecification compileQueryJob(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
+            IHyracksClientConnection hcc, Query q) throws Exception {
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        JobSpecification jobSpec = null;
+        try {
+            jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider,
q, null);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+        } catch (Exception e) {
+            LOGGER.log(Level.INFO, e.getMessage(), e);
+            if (bActiveTxn) {
+                ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+            }
+            throw e;
+        }
+        return jobSpec;
+    }
+
     private Pair<JobSpecification, PrecompiledType> createProcedureJob(String body,
-            IStatementExecutor statementExecutor,
-            MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset
hdc, Stats stats)
-                    throws Exception {
+            IStatementExecutor statementExecutor, MetadataProvider metadataProvider, IHyracksClientConnection
hcc,
+            IHyracksDataset hdc, Stats stats) throws Exception {
         StringBuilder builder = new StringBuilder();
         builder.append(body);
         builder.append(";");
@@ -157,8 +176,11 @@
                             fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, stats, true,
null, null),
                     PrecompiledType.INSERT);
         } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
-            return new Pair<>(((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc,
metadataProvider,
-                    (Query) fStatements.get(0), null), PrecompiledType.QUERY);
+            Pair<JobSpecification, PrecompiledType> pair =
+                    new Pair<>(compileQueryJob(statementExecutor, metadataProvider,
hcc, (Query) fStatements.get(0)),
+                            PrecompiledType.QUERY);
+            metadataProvider.getLocks().unlock();
+            return pair;
         } else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
             AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
             fStatements.get(0).accept(visitor, null);
@@ -170,10 +192,10 @@
     }
 
     private void setupDistributedJob(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection
hcc,
-            PrecompiledJobEventListener listener, MetadataProvider metadataProvider, IHyracksDataset
hdc, Stats stats)
+            PrecompiledJobEventListener listener, ResultSetId resultSetId, IHyracksDataset
hdc, Stats stats)
                     throws Exception {
         JobId jobId = hcc.distributeJob(jobSpec);
-        listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, metadataProvider.getResultSetId()));
+        listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId));
     }
 
     @Override
@@ -211,12 +233,22 @@
             procedure = new Procedure(dataverse, signature.getName(), signature.getArity(),
getParamList(),
                     Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL, duration);
 
-            metadataProvider.setResultSetId(new ResultSetId(0));
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                    metadataProvider.getStorageComponentProvider());
+            tempMdProvider.setConfig(metadataProvider.getConfig());
+
             metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+            boolean resultsAsync = resultDelivery == ResultDelivery.ASYNC || resultDelivery
== ResultDelivery.DEFERRED;
+            metadataProvider.setResultAsyncMode(resultsAsync);
+            tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
+            tempMdProvider.setResultAsyncMode(resultsAsync);
+            tempMdProvider.setWriterFactory(metadataProvider.getWriterFactory());
+            tempMdProvider.setResultSerializerFactoryProvider(metadataProvider.getResultSerializerFactoryProvider());
+            tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
 
             //Create Procedure Internal Job
             Pair<JobSpecification, PrecompiledType> procedureJobSpec =
-                    createProcedureJob(getFunctionBody(), statementExecutor, metadataProvider,
hcc, hdc, stats);
+                    createProcedureJob(getFunctionBody(), statementExecutor, tempMdProvider,
hcc, hdc, stats);
 
             // Now we subscribe
             if (listener == null) {
@@ -224,8 +256,8 @@
                 listener = new PrecompiledJobEventListener(entityId, procedureJobSpec.second,
new ArrayList<>());
                 ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
             }
-
-            setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, metadataProvider,
hdc, stats);
+            setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(),
hdc,
+                    stats);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -235,6 +267,8 @@
             }
             LOGGER.log(Level.WARNING, "Failed creating a procedure", e);
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
 
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 7bd00c1..1a319a1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -136,6 +136,8 @@
                 QueryTranslator.abort(e, e, mdTxnCtx);
             }
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index 9fe8a83..abdf90a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -125,6 +125,8 @@
                 QueryTranslator.abort(e, e, mdTxnCtx);
             }
             throw new HyracksDataException(e);
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 2001988..adce6ed 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -226,19 +226,12 @@
    }
    | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
       {
-        setDataverses(new ArrayList<String>());
-        setDatasets(new ArrayList<String>());
         VariableExpr varExp = new VariableExpr();
         VarIdentifier var = new VarIdentifier();
         varExp.setVar(var);
         var.setValue("$subscriptionPlaceholder");
         getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
-        List<String> dataverses = getDataverses();
-        List<String> datasets = getDatasets();
-        // we remove the pointer to the dataverses and datasets
-        setDataverses(null);
-        setDatasets(null);
-        stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second,
id, getVarCounter(), dataverses, datasets);
+        stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second,
id, getVarCounter());
       }
      | "change" "subscription" subscriptionId = StringLiteral()  <ON> nameComponents
= QualifiedName()
        <LEFTPAREN> (tmp = Expression()
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.aql
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.aql
new file mode 100644
index 0000000..e8f5d56
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.aql
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Room Occupants Long Term Test
+* Expected Res : Success
+* Date         : Sep 2016
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type userLocation as {
+  userId: int,
+  roomNumber: int
+}
+
+create dataset UserLocations(userLocation)
+primary key userId;
+
+create function RoomOccupants($room) {
+    for $location in dataset UserLocations
+    where $location.roomNumber = $room
+    return $location.userId
+};
+
+create broker brokerA at "http://www.notifyA.com";
+
+create repetitive channel roomRecords using RoomOccupants@1 period duration("PT1S");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.2.update.aql
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.2.update.aql
new file mode 100644
index 0000000..e253265
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.2.update.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Room Occupants Test
+* Expected Res : Success
+* Date         : Sep 2016
+* Author       : Steven Jacobs
+*/
+
+
+use dataverse channels;
+
+subscribe to roomRecords (123) on brokerA;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.3.update.aql
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.3.update.aql
new file mode 100644
index 0000000..66d6a7d
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.3.update.aql
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Room Occupants Test
+* Expected Res : Success
+* Date         : Sep 2016
+* Author       : Steven Jacobs
+*/
+
+
+use dataverse channels;
+
+
+upsert into dataset UserLocations([
+{"userId":1, "roomNumber":123},
+{"userId":2, "roomNumber":222},
+{"userId":3, "roomNumber":350}]
+);
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.aql
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.aql
new file mode 100644
index 0000000..12bc053
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Room Occupants Test
+* Expected Res : Success
+* Date         : Sep 2016
+* Author       : Steven Jacobs
+*/
+600000
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.aql
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.aql
new file mode 100644
index 0000000..55e044b
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Room Occupants Test
+* Expected Res : Success
+* Date         : Sep 2016
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+
+count (from $result in dataset roomRecordsResults
+order by $result.result
+select $result.result) > 599;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/ten_minute_channel/ten_minute_channel.5.adm
b/asterix-bad/src/test/resources/runtimets/results/channel/ten_minute_channel/ten_minute_channel.5.adm
new file mode 100644
index 0000000..f32a580
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/ten_minute_channel/ten_minute_channel.5.adm
@@ -0,0 +1 @@
+true
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/testsuite.xml b/asterix-bad/src/test/resources/runtimets/testsuite.xml
index 67daa0d..ae35454 100644
--- a/asterix-bad/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-bad/src/test/resources/runtimets/testsuite.xml
@@ -66,10 +66,15 @@
             <output-dir compare="Text">drop_channel_check_metadata</output-dir>
         </compilation-unit>
     </test-case>
-            <test-case FilePath="channel">
+    <test-case FilePath="channel">
         <compilation-unit name="subscribe_channel_check_subscriptions">
             <output-dir compare="Text">subscribe_channel_check_subscriptions</output-dir>
         </compilation-unit>
     </test-case>
+    <test-case FilePath="channel">
+        <compilation-unit name="ten_minute_channel">
+            <output-dir compare="Text">ten_minute_channel</output-dir>
+        </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I99e799e203f6ca6082f9c90f04e606c436eb00ee
Gerrit-PatchSet: 9
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sjaco002@ucr.edu>
Gerrit-Reviewer: Ildar Absalyamov <ildar.absalyamov@gmail.com>
Gerrit-Reviewer: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sjaco002@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xkkwww@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message