asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject asterixdb-bad git commit: Separate Predistributed Jobs from other Active Jobs
Date Thu, 27 Jul 2017 06:37:09 GMT
Repository: asterixdb-bad
Updated Branches:
  refs/heads/master 4345dffae -> 83f6d53b0


Separate Predistributed Jobs from other Active Jobs

Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1


Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/83f6d53b
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/83f6d53b
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/83f6d53b

Branch: refs/heads/master
Commit: 83f6d53b025a81d8deba750f74b8ce9d03607961
Parents: 4345dff
Author: Till Westmann <tillw@apache.org>
Authored: Wed Jul 26 01:24:33 2017 -0700
Committer: Till Westmann <tillw@apache.org>
Committed: Wed Jul 26 01:24:33 2017 -0700

----------------------------------------------------------------------
 .../bad/lang/BADQueryTranslatorFactory.java     |   3 +-
 .../asterix/bad/lang/BADStatementExecutor.java  |   9 +-
 .../lang/statement/ChannelDropStatement.java    |  15 +-
 .../statement/ChannelSubscribeStatement.java    |   2 +-
 .../statement/ChannelUnsubscribeStatement.java  |   2 +-
 .../lang/statement/CreateChannelStatement.java  |  60 ++++----
 .../statement/CreateProcedureStatement.java     |  43 +++---
 .../statement/ExecuteProcedureStatement.java    |  10 +-
 .../lang/statement/ProcedureDropStatement.java  |  12 +-
 .../bad/metadata/BADMetadataExtension.java      |   4 +-
 .../bad/metadata/BrokerTupleTranslator.java     |   2 +-
 .../bad/metadata/ChannelTupleTranslator.java    |   2 +-
 .../metadata/PrecompiledJobEventListener.java   | 143 +++++++++++++++++--
 .../bad/metadata/ProcedureTupleTranslator.java  |   2 +-
 14 files changed, 209 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
index 09a436b..8f5d520 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
@@ -33,7 +33,6 @@ public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory
{
     @Override
     public QueryTranslator create(ICcApplicationContext appCtx, List<Statement> statements,
SessionOutput output,
             ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider)
{
-        return new BADStatementExecutor(appCtx, statements, output, compilationProvider,
storageComponentProvider,
-                executorService);
+        return new BADStatementExecutor(appCtx, statements, output, compilationProvider,
executorService);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 5775e9b..bc17a7d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -28,7 +28,6 @@ import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
 import org.apache.asterix.bad.metadata.Broker;
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.bad.metadata.Procedure;
-import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
@@ -44,9 +43,8 @@ import org.apache.hyracks.api.client.IHyracksClientConnection;
 public class BADStatementExecutor extends QueryTranslator {
 
     public BADStatementExecutor(ICcApplicationContext appCtx, List<Statement> statements,
SessionOutput output,
-            ILangCompilationProvider compliationProvider, IStorageComponentProvider storageComponentProvider,
-            ExecutorService executorService) {
-        super(appCtx, statements, output, compliationProvider, storageComponentProvider,
executorService);
+            ILangCompilationProvider compliationProvider, ExecutorService executorService)
{
+        super(appCtx, statements, output, compliationProvider, executorService);
     }
 
     @Override
@@ -59,8 +57,7 @@ public class BADStatementExecutor extends QueryTranslator {
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
         List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
-        MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
-                metadataProvider.getStorageComponentProvider());
+        MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
         tempMdProvider.setConfig(metadataProvider.getConfig());
         for (Broker broker : brokers) {
             tempMdProvider.getLocks().reset();

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 1b655da..907bd0e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -18,10 +18,9 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.lang.BADLangExtension;
@@ -91,10 +90,9 @@ public class ChannelDropStatement implements IExtensionStatement {
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse,
channelName.getValue());
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
-        PrecompiledJobEventListener listener =
-                (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
+        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
         Channel channel = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -115,14 +113,13 @@ public class ChannelDropStatement implements IExtensionStatement {
             listener.getExecutorService().shutdownNow();
             JobId hyracksJobId = listener.getJobId();
             listener.deActivate();
-            activeEventHandler.removeListener(listener);
+            activeEventHandler.unregisterListener(listener);
             if (hyracksJobId != null) {
                 hcc.destroyJob(hyracksJobId);
             }
 
             //Create a metadata provider to use in nested jobs.
-            MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
-                    metadataProvider.getStorageComponentProvider());
+            MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
             tempMdProvider.setConfig(metadataProvider.getConfig());
             //Drop the Channel Datasets
             //TODO: Need to find some way to handle if this fails.

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 5bf0690..df8dab1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -186,7 +186,7 @@ public class ChannelSubscribeStatement implements IExtensionStatement
{
             subscriptionTuple.setVarCounter(varCounter);
 
             MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
-                    metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
+                    metadataProvider.getDefaultDataverse());
             tempMdProvider.setConfig(metadataProvider.getConfig());
 
             if (subscriptionId == null) {

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 60de69e..28d09df 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -144,7 +144,7 @@ public class ChannelUnsubscribeStatement implements IExtensionStatement
{
             SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
             delete.accept(visitor, null);
             MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
-                    metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
+                    metadataProvider.getDefaultDataverse());
             tempMdProvider.setConfig(metadataProvider.getConfig());
             ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete,
hcc, false);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 7d91d2e..a43020f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -21,15 +21,17 @@ package org.apache.asterix.bad.lang.statement;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.StringReader;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.ChannelJobService;
@@ -42,6 +44,7 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.lang.common.base.Expression;
@@ -56,7 +59,6 @@ import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
 import org.apache.asterix.lang.common.statement.SetStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -67,7 +69,6 @@ import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
@@ -131,11 +132,13 @@ public class CreateChannelStatement implements IExtensionStatement {
         return channelResultsInsertQuery;
     }
 
-    @Override public byte getCategory() {
+    @Override
+    public byte getCategory() {
         return Category.DDL;
     }
 
-    @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws
CompilationException {
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException
{
         return null;
     }
 
@@ -160,7 +163,8 @@ public class CreateChannelStatement implements IExtensionStatement {
 
     }
 
-    @Override public byte getKind() {
+    @Override
+    public byte getKind() {
         return Kind.EXTENSION;
     }
 
@@ -188,14 +192,13 @@ public class CreateChannelStatement implements IExtensionStatement {
         fieldNames.add(BADConstants.ResultId);
         partitionFields.add(fieldNames);
         idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
-        DatasetDecl createResultsDataset =
-                new DatasetDecl(new Identifier(dataverse), resultsName, new Identifier(BADConstants.BAD_DATAVERSE_NAME),
-                        resultsTypeName, null, null, null, null, new HashMap<String, String>(),
-                        new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
+        DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName,
+                new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null,
null, null,
+                new HashMap<String, String>(), new HashMap<String, String>(),
DatasetType.INTERNAL, idd, true);
 
         //Run both statements to create datasets
-        ((QueryTranslator) statementExecutor)
-                .handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
hcc);
+        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
createSubscriptionsDataset,
+                hcc);
         metadataProvider.getLocks().reset();
         ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
createResultsDataset, hcc);
 
@@ -231,9 +234,8 @@ public class CreateChannelStatement implements IExtensionStatement {
         SetStatement ss = (SetStatement) fStatements.get(0);
         metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
 
-        return ((QueryTranslator) statementExecutor)
-                .handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc, hdc,
ResultDelivery.ASYNC, null,
-                        stats, true, null, null);
+        return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
fStatements.get(1),
+                hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null);
     }
 
     private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection
hcc,
@@ -244,15 +246,15 @@ public class CreateChannelStatement implements IExtensionStatement {
             if (predistributed) {
                 jobId = hcc.distributeJob(channeljobSpec);
             }
-            ScheduledExecutorService ses = ChannelJobService
-                    .startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class), jobId, hcc,
-                            ChannelJobService.findPeriod(duration));
+            ScheduledExecutorService ses = ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
+                    jobId, hcc, ChannelJobService.findPeriod(duration));
             listener.storeDistributedInfo(jobId, ses, null);
         }
 
     }
 
-    @Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+    @Override
+    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
Stats stats,
             int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
 
@@ -271,10 +273,9 @@ public class CreateChannelStatement implements IExtensionStatement {
         Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
         EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse,
channelName.getValue());
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
-        PrecompiledJobEventListener listener =
-                (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
+        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
         boolean alreadyActive = false;
         Channel channel = null;
 
@@ -287,7 +288,7 @@ public class CreateChannelStatement implements IExtensionStatement {
                 throw new AlgebricksException("A channel with this name " + channelName +
" already exists.");
             }
             if (listener != null) {
-                alreadyActive = listener.isEntityActive();
+                alreadyActive = listener.isActive();
             }
             if (alreadyActive) {
                 throw new AsterixException("Channel " + channelName + " is already running");
@@ -304,15 +305,14 @@ public class CreateChannelStatement implements IExtensionStatement {
                 throw new AsterixException("The channel name:" + channelName + " is not available.");
             }
             MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
-                    metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
+                    metadataProvider.getDefaultDataverse());
             tempMdProvider.setConfig(metadataProvider.getConfig());
             //Create Channel Datasets
             createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider,
hcc, hdc, dataverse);
             tempMdProvider.getLocks().reset();
             //Create Channel Internal Job
-            JobSpecification channeljobSpec =
-                    createChannelJob(statementExecutor, subscriptionsName, resultsName, tempMdProvider,
hcc, hdc, stats,
-                            dataverse);
+            JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName,
resultsName,
+                    tempMdProvider, hcc, hdc, stats, dataverse);
 
             // Now we subscribe
             if (listener == null) {

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index d203905..1c497a8 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -26,10 +26,9 @@ import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
@@ -41,6 +40,7 @@ import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
@@ -52,7 +52,6 @@ import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -96,7 +95,8 @@ public class CreateProcedureStatement implements IExtensionStatement {
         this.period = (CallExpr) period;
     }
 
-    @Override public byte getKind() {
+    @Override
+    public byte getKind() {
         return Kind.EXTENSION;
     }
 
@@ -108,7 +108,8 @@ public class CreateProcedureStatement implements IExtensionStatement {
         return signature;
     }
 
-    @Override public byte getCategory() {
+    @Override
+    public byte getCategory() {
         return Category.DDL;
     }
 
@@ -116,7 +117,8 @@ public class CreateProcedureStatement implements IExtensionStatement {
         return period;
     }
 
-    @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws
CompilationException {
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException
{
         return null;
     }
 
@@ -167,9 +169,10 @@ public class CreateProcedureStatement implements IExtensionStatement
{
             throw new CompilationException("Procedure can only execute a single statement");
         }
         if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
-            return new Pair<>(((QueryTranslator) statementExecutor)
-                    .handleInsertUpsertStatement(metadataProvider, fStatements.get(0), hcc,
hdc, ResultDelivery.ASYNC,
-                            null, stats, true, null, null), PrecompiledType.INSERT);
+            return new Pair<>(
+                    ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
+                            fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, null, stats,
true, null, null),
+                    PrecompiledType.INSERT);
         } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
             Pair<JobSpecification, PrecompiledType> pair =
                     new Pair<>(compileQueryJob(statementExecutor, metadataProvider,
hcc, (Query) fStatements.get(0)),
@@ -179,8 +182,8 @@ public class CreateProcedureStatement implements IExtensionStatement {
         } else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
             SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
             fStatements.get(0).accept(visitor, null);
-            return new Pair<>(((QueryTranslator) statementExecutor)
-                    .handleDeleteStatement(metadataProvider, fStatements.get(0), hcc, true),
PrecompiledType.DELETE);
+            return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
+                    fStatements.get(0), hcc, true), PrecompiledType.DELETE);
         } else {
             throw new CompilationException("Procedure can only execute a single delete, insert,
or query");
         }
@@ -193,18 +196,18 @@ public class CreateProcedureStatement implements IExtensionStatement
{
         listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId));
     }
 
-    @Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+    @Override
+    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
Stats stats,
             int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
         initialize();
         String dataverse =
                 ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
-        PrecompiledJobEventListener listener =
-                (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
         boolean alreadyActive = false;
         Procedure procedure = null;
 
@@ -212,13 +215,13 @@ public class CreateProcedureStatement implements IExtensionStatement
{
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            procedure = BADLangExtension
-                    .getProcedure(mdTxnCtx, dataverse, signature.getName(), Integer.toString(signature.getArity()));
+            procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, signature.getName(),
+                    Integer.toString(signature.getArity()));
             if (procedure != null) {
                 throw new AlgebricksException("A procedure with this name " + signature.getName()
+ " already exists.");
             }
             if (listener != null) {
-                alreadyActive = listener.isEntityActive();
+                alreadyActive = listener.isActive();
             }
             if (alreadyActive) {
                 throw new AsterixException("Procedure " + signature.getName() + " is already
running");
@@ -228,7 +231,7 @@ public class CreateProcedureStatement implements IExtensionStatement {
                     Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL, duration);
 
             MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
-                    metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
+                    metadataProvider.getDefaultDataverse());
             tempMdProvider.setConfig(metadataProvider.getConfig());
 
             metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 9bf3718..e60570c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -21,11 +21,10 @@ package org.apache.asterix.bad.lang.statement;
 import java.util.EnumSet;
 import java.util.concurrent.ScheduledExecutorService;
 
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
@@ -95,13 +94,12 @@ public class ExecuteProcedureStatement implements IExtensionStatement
{
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
Stats stats,
             int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
         String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(dataverseName));
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName);
-        PrecompiledJobEventListener listener =
-                (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index f7c3a74..a1bf0d3 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -18,10 +18,9 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.lang.BADLangExtension;
@@ -82,16 +81,15 @@ public class ProcedureDropStatement implements IExtensionStatement {
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
Stats stats,
             int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
         FunctionSignature signature = getFunctionSignature();
         String dataverse =
                 ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
         signature.setNamespace(dataverse);
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
-        PrecompiledJobEventListener listener =
-                (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -115,7 +113,7 @@ public class ProcedureDropStatement implements IExtensionStatement {
             }
             JobId hyracksJobId = listener.getJobId();
             listener.deActivate();
-            activeEventHandler.removeListener(listener);
+            activeEventHandler.unregisterListener(listener);
             if (hyracksJobId != null) {
                 hcc.destroyJob(hyracksJobId);
             }

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
index cd4470f..cd2ff86 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -42,8 +42,8 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class BADMetadataExtension implements IMetadataExtension {
 
-    public static final ExtensionId BAD_METADATA_EXTENSION_ID = new ExtensionId(
-            BADConstants.BAD_METADATA_EXTENSION_NAME, 0);
+    public static final ExtensionId BAD_METADATA_EXTENSION_ID =
+            new ExtensionId(BADConstants.BAD_METADATA_EXTENSION_NAME, 0);
     public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
             NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
 

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
index d3d2e66..de1aab8 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
@@ -19,8 +19,8 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
 import org.apache.asterix.om.base.ARecord;
 import org.apache.asterix.om.base.AString;

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index e6d0249..d577260 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -19,9 +19,9 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
 import org.apache.asterix.om.base.ARecord;
 import org.apache.asterix.om.base.AString;

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
index 5eb18d1..8e0cf5f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
@@ -18,27 +18,30 @@
  */
 package org.apache.asterix.bad.metadata;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveEvent.Kind;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.active.IActiveEntityEventSubscriber;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.metadata.IDataset;
-import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-public class PrecompiledJobEventListener extends ActiveEntityEventsListener {
-    private static final Logger LOGGER = Logger.getLogger(PrecompiledJobEventListener.class);
+public class PrecompiledJobEventListener implements IActiveEntityEventsListener {
 
-    private ScheduledExecutorService executorService = null;
-    private ResultReader resultReader;
+    private static final Logger LOGGER = Logger.getLogger(PrecompiledJobEventListener.class);
 
     public enum PrecompiledType {
         CHANNEL,
@@ -47,15 +50,119 @@ public class PrecompiledJobEventListener extends ActiveEntityEventsListener
{
         DELETE
     }
 
+    enum RequestState {
+        INIT,
+        STARTED,
+        FINISHED
+    }
+
+    private ScheduledExecutorService executorService = null;
+    private ResultReader resultReader;
     private final PrecompiledType type;
+    // members
+    protected volatile ActivityState state;
+    protected JobId jobId;
+    protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>();
+    protected final ICcApplicationContext appCtx;
+    protected final EntityId entityId;
+    protected final List<IDataset> datasets;
+    protected final ActiveEvent statsUpdatedEvent;
+    protected long statsTimestamp;
+    protected String stats;
+    protected RequestState statsRequestState;
+    protected final String runtimeName;
+    protected final AlgebricksAbsolutePartitionConstraint locations;
+    protected int numRegistered;
 
     public PrecompiledJobEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType
type,
             List<IDataset> datasets, AlgebricksAbsolutePartitionConstraint locations,
String runtimeName) {
-        super(appCtx, entityId, datasets, locations, runtimeName);
+        this.appCtx = appCtx;
+        this.entityId = entityId;
+        this.datasets = datasets;
+        this.state = ActivityState.STOPPED;
+        this.statsTimestamp = -1;
+        this.statsRequestState = RequestState.INIT;
+        this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId, null);
+        this.stats = "{\"Stats\":\"N/A\"}";
+        this.runtimeName = runtimeName;
+        this.locations = locations;
+        this.numRegistered = 0;
         state = ActivityState.STOPPED;
         this.type = type;
     }
 
+    protected synchronized void handle(ActivePartitionMessage message) {
+        if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
+            numRegistered++;
+            if (numRegistered == locations.getLocations().length) {
+                state = ActivityState.RUNNING;
+            }
+        }
+    }
+
+    @Override
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+    @Override
+    public ActivityState getState() {
+        return state;
+    }
+
+    @Override
+    public boolean isEntityUsingDataset(IDataset dataset) {
+        return datasets.contains(dataset);
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public String getStats() {
+        return stats;
+    }
+
+    @Override
+    public long getStatsTimeStamp() {
+        return statsTimestamp;
+    }
+
+    public String formatStats(List<String> responses) {
+        StringBuilder strBuilder = new StringBuilder();
+        strBuilder.append("{\"Stats\": [").append(responses.get(0));
+        for (int i = 1; i < responses.size(); i++) {
+            strBuilder.append(", ").append(responses.get(i));
+        }
+        strBuilder.append("]}");
+        return strBuilder.toString();
+    }
+
+    protected synchronized void notifySubscribers(ActiveEvent event) {
+        notifyAll();
+        Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
+        while (it.hasNext()) {
+            IActiveEntityEventSubscriber subscriber = it.next();
+            if (subscriber.isDone()) {
+                it.remove();
+            } else {
+                try {
+                    subscriber.notify(event);
+                } catch (HyracksDataException e) {
+                    LOGGER.log(Level.WARN, "Failed to notify subscriber", e);
+                }
+                if (subscriber.isDone()) {
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    public AlgebricksAbsolutePartitionConstraint getLocations() {
+        return locations;
+    }
+
     public ResultReader getResultReader() {
         return resultReader;
     }
@@ -74,10 +181,6 @@ public class PrecompiledJobEventListener extends ActiveEntityEventsListener
{
         return executorService;
     }
 
-    public boolean isEntityActive() {
-        return state == ActivityState.STARTED;
-    }
-
     public void deActivate() {
         state = ActivityState.STOPPED;
     }
@@ -110,7 +213,7 @@ public class PrecompiledJobEventListener extends ActiveEntityEventsListener
{
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Channel Job started for  " + entityId);
         }
-        state = ActivityState.STARTED;
+        state = ActivityState.RUNNING;
     }
 
     private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception
{
@@ -120,7 +223,21 @@ public class PrecompiledJobEventListener extends ActiveEntityEventsListener
{
     }
 
     @Override
-    public synchronized void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException
{
+    public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException
{
         // no op
     }
+
+    @Override
+    public boolean isActive() {
+        return state == ActivityState.RUNNING;
+    }
+
+    @Override
+    public void unregister() throws HyracksDataException {
+    }
+
+    @Override
+    public Exception getJobFailure() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
index f324a46..1aa633f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
@@ -26,8 +26,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
 import org.apache.asterix.om.base.AOrderedList;
 import org.apache.asterix.om.base.ARecord;


Mime
View raw message