asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjaco...@apache.org
Subject [2/4] asterixdb git commit: Enhanced Insert AQL
Date Sun, 16 Oct 2016 18:08:27 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f4ef1a1..8a25888 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -65,19 +65,19 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.api.IFeed.FeedType;
 import org.apache.asterix.external.feed.api.IFeedJoint;
 import org.apache.asterix.external.feed.api.IFeedJoint.FeedJointType;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.management.FeedEventsListener;
 import org.apache.asterix.external.feed.management.FeedJointKey;
-import org.apache.asterix.external.feed.management.FeedLifecycleEventSubscriber;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
 import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
 import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
 import org.apache.asterix.external.indexing.ExternalFile;
@@ -342,7 +342,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                         break;
                     case Statement.Kind.INSERT:
                     case Statement.Kind.UPSERT:
-                        handleInsertUpsertStatement(metadataProvider, stmt, hcc);
+                        if (((InsertStatement) stmt).getReturnQuery() != null) {
+                            metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+                            metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
+                                    || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+                        }
+                        handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false);
                         break;
                     case Statement.Kind.DELETE:
                         handleDeleteStatement(metadataProvider, stmt, hcc);
@@ -393,7 +398,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                         // No op
                         break;
                     case Statement.Kind.EXTENSION:
-                        ((IExtensionStatement) stmt).handle(this, metadataProvider, hcc);
+                        ((IExtensionStatement) stmt).handle(this, metadataProvider, hcc, hdc, resultDelivery, stats,
+                                resultSetIdCounter);
                         break;
                     default:
                         throw new AsterixException("Unknown function");
@@ -506,7 +512,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+    public void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws AsterixException, Exception {
         MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
         DatasetDecl dd = (DatasetDecl) stmt;
@@ -602,8 +608,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     break;
                 case EXTERNAL:
                     String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
-                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl())
-                            .getProperties();
+                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
 
                     datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(),
                             ExternalDatasetTransactionState.COMMIT);
@@ -705,7 +710,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         StringBuilder builder = null;
         IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
         for (IActiveEntityEventsListener listener : listeners) {
-            if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+            if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
                 if (builder == null) {
                     builder = new StringBuilder();
                 }
@@ -730,8 +735,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse,
-            MetadataTransactionContext mdTxnCtx)
+    protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse, MetadataTransactionContext mdTxnCtx)
             throws AsterixException {
         int nodegroupCardinality;
         String nodegroupName;
@@ -984,8 +988,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
 
             ARecordType enforcedType = null;
             if (stmtCreateIndex.isEnforced()) {
-                enforcedType = createEnforcedType(aRecordType,
-                        Lists.newArrayList(index));
+                enforcedType = createEnforcedType(aRecordType, Lists.newArrayList(index));
             }
 
             // #. prepare to create the index artifact in NC.
@@ -1350,7 +1353,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
     }
 
-    protected void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+    public void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
         String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
@@ -1369,8 +1372,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
                     return;
                 } else {
-                    throw new AlgebricksException("There is no dataset with this name " + datasetName
-                            + " in dataverse " + dataverseName + ".");
+                    throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+                            + dataverseName + ".");
                 }
             }
 
@@ -1424,7 +1427,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             // prepare job spec(s) that would disconnect any active feeds involving the dataset.
             IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             for (IActiveEntityEventsListener listener : activeListeners) {
-                if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+                if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
                     throw new AsterixException(
                             "Can't drop dataset since it is connected to active entity: " + listener.getEntityId());
                 }
@@ -1482,8 +1485,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 } else {
                     CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
                             indexes.get(j).getIndexName());
-                    jobsToExecute.add(
-                            ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
+                    jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
                 }
             }
 
@@ -1547,7 +1549,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             StringBuilder builder = null;
             for (IActiveEntityEventsListener listener : listeners) {
-                if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+                if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
                     if (builder == null) {
                         builder = new StringBuilder();
                     }
@@ -1555,9 +1557,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 }
             }
             if (builder != null) {
-                throw new AsterixException(
-                        "Dataset" + datasetName + " is currently being fed into by the following active entities: "
-                                + builder.toString());
+                throw new AsterixException("Dataset" + datasetName
+                        + " is currently being fed into by the following active entities: " + builder.toString());
             }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
@@ -1578,11 +1579,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(),
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
-                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
-                                IMetadataEntity.PENDING_DROP_OP));
+                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
 
                 // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1644,11 +1643,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(),
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
-                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
-                                IMetadataEntity.PENDING_DROP_OP));
+                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
 
                 // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1707,9 +1704,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending index("
-                            + dataverseName + "." + datasetName + "."
-                            + indexName + ") couldn't be removed from the metadata", e);
+                    throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName + "."
+                            + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
                 }
             }
 
@@ -1748,8 +1744,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws Exception {
+    protected void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
         NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
         String nodegroupName = stmtDelete.getNodeGroupName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1859,40 +1854,61 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void handleInsertUpsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+    public JobSpecification handleInsertUpsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
+            IStatementExecutor.Stats stats, boolean compileOnly) throws Exception {
 
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
+
         String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName());
         Query query = stmtInsertUpsert.getQuery();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        if (stmtInsertUpsert.getReturnQuery() != null) {
+            if (!stmtInsertUpsert.getReturnQuery().getDatasets().isEmpty()) {
+                throw new AsterixException("Cannot use datasets in an insert returning query");
+            }
+            // returnQuery Rewriting (happens under the same ongoing metadata transaction)
+            Pair<Query, Integer> rewrittenReturnQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider,
+                    stmtInsertUpsert.getReturnQuery(), sessionConfig);
+
+            stmtInsertUpsert.getQuery().setVarCounter(rewrittenReturnQuery.first.getVarCounter());
+            stmtInsertUpsert.setRewrittenReturnQuery(rewrittenReturnQuery.first);
+            stmtInsertUpsert.addToVarCounter(rewrittenReturnQuery.second);
+        }
+
         MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName,
                 dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(), query.getDatasets());
-
+        JobSpecification compiled = null;
         try {
             metadataProvider.setWriteTransaction(true);
             CompiledInsertStatement clfrqs = null;
             switch (stmtInsertUpsert.getKind()) {
                 case Statement.Kind.INSERT:
                     clfrqs = new CompiledInsertStatement(dataverseName, stmtInsertUpsert.getDatasetName().getValue(),
-                            query, stmtInsertUpsert.getVarCounter());
+                            query, stmtInsertUpsert.getVarCounter(), stmtInsertUpsert.getVar(),
+                            stmtInsertUpsert.getReturnQuery());
                     break;
                 case Statement.Kind.UPSERT:
                     clfrqs = new CompiledUpsertStatement(dataverseName, stmtInsertUpsert.getDatasetName().getValue(),
-                            query, stmtInsertUpsert.getVarCounter());
+                            query, stmtInsertUpsert.getVarCounter(), stmtInsertUpsert.getVar(),
+                            stmtInsertUpsert.getReturnQuery());
                     break;
                 default:
                     throw new AlgebricksException("Unsupported statement type " + stmtInsertUpsert.getKind());
             }
-            JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, clfrqs);
+            compiled = rewriteCompileQuery(metadataProvider, query, clfrqs);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            if (compiled != null) {
-                JobUtils.runJob(hcc, compiled, true);
+            if (compiled != null && !compileOnly) {
+                if (stmtInsertUpsert.getReturnQuery() != null) {
+                    handleQueryResult(metadataProvider, hcc, hdc, compiled, resultDelivery, stats);
+                } else {
+                    JobUtils.runJob(hcc, compiled, true);
+                }
             }
 
         } catch (Exception e) {
@@ -1905,9 +1921,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
                     query.getDatasets());
         }
+        return compiled;
     }
 
-    protected void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+    public void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
         DeleteStatement stmtDelete = (DeleteStatement) stmt;
@@ -1948,7 +1965,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
     @Override
     public JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
             ICompiledDmlStatement stmt)
-            throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
+                    throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
 
         // Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
@@ -2034,12 +2051,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             String description = cfps.getDescription() == null ? "" : cfps.getDescription();
             if (extendingExisting) {
                 FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE
-                        .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse,
-                                cfps.getSourcePolicyName());
+                        .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
                 if (sourceFeedPolicy == null) {
-                    sourceFeedPolicy =
-                            MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
-                                    MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
+                    sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
+                            MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
                     if (sourceFeedPolicy == null) {
                         throw new AlgebricksException("Unknown policy " + cfps.getSourcePolicyName());
                     }
@@ -2158,7 +2173,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         boolean subscriberRegistered = false;
-        IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
+        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
         FeedConnectionId feedConnId = null;
         EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, cfs.getFeedName());
         FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
@@ -2211,7 +2226,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                         triple.third.get(0), pair.first);
                 pair.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob);
                 JobUtils.runJob(hcc, pair.first, false);
-                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+                eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_INTAKE_STARTED);
             } else {
                 for (IFeedJoint fj : triple.third) {
                     listener.registerFeedJoint(fj, 0);
@@ -2219,9 +2234,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED);
+            eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_STARTED);
             if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) {
-                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED); // blocking call
+                eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_ENDED); // blocking call
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2251,7 +2266,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
      */
     protected Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse,
             Feed feed, String dataset, FeedPolicyEntity feedPolicy, MetadataTransactionContext mdTxnCtx)
-            throws AsterixException {
+                    throws AsterixException {
         IFeedJoint sourceFeedJoint;
         FeedConnectionRequest request;
         List<String> functionsToApply = new ArrayList<>();
@@ -2349,10 +2364,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
         EntityId entityId = new EntityId(Feed.EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName());
         FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue());
-        IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
+        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
         FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
                 .getActiveEntityListener(entityId);
-        if (listener == null || !listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+        if (listener == null || !listener.isEntityUsingDataset(dataverseName, datasetName)) {
             throw new AsterixException("Feed " + feed.getFeedId().getEntityName() + " is currently not connected to "
                     + cfs.getDatasetName().getValue() + ". Invalid operation!");
         }
@@ -2372,7 +2387,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             JobUtils.runJob(hcc, jobSpec, true);
-            eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED);
+            eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_ENDED);
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
@@ -2428,8 +2443,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             if (compiled != null) {
                 FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
                         .getActiveEntityListener(bfs.getSubscriptionRequest().getReceivingFeedId());
-                FeedConnectJobInfo activeJob = new FeedConnectJobInfo(
-                        bfs.getSubscriptionRequest().getReceivingFeedId(), null, ActivityState.ACTIVE,
+                FeedConnectJobInfo activeJob = new FeedConnectJobInfo(bfs.getSubscriptionRequest().getReceivingFeedId(),
+                        null, ActivityState.ACTIVE,
                         new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), dataset),
                         listener.getSourceFeedJoint(), null, alteredJobSpec, policy.getProperties());
                 alteredJobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob);
@@ -2484,8 +2499,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     dataverseName);
             jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider));
             ARecordType aRecordType = (ARecordType) dt.getDatatype();
-            ARecordType enforcedType = createEnforcedType(
-                    aRecordType, indexes);
+            ARecordType enforcedType = createEnforcedType(aRecordType, indexes);
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 for (int j = 0; j < indexes.size(); j++) {
                     if (indexes.get(j).isSecondaryIndex()) {
@@ -2541,59 +2555,25 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider));
     }
 
-    protected void handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
-            IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats) throws Exception {
-
+    protected JobSpecification handleQuery(AqlMetadataProvider metadataProvider, Query query,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats)
+                    throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockManager.INSTANCE.queryBegin(activeDefaultDataverse, query.getDataverses(), query.getDatasets());
+        JobSpecification compiled;
         try {
-            JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, null);
+            compiled = rewriteCompileQuery(metadataProvider, query, null);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
             if (query.isExplain()) {
                 sessionConfig.out().flush();
-                return;
+                return compiled;
             } else if (sessionConfig.isExecuteQuery() && compiled != null) {
-                if (GlobalConfig.ASTERIX_LOGGER.isLoggable(Level.FINE)) {
-                    GlobalConfig.ASTERIX_LOGGER.fine(compiled.toJSON().toString(1));
-                }
-                JobId jobId = JobUtils.runJob(hcc, compiled, false);
-
-                JSONObject response = new JSONObject();
-                switch (resultDelivery) {
-                    case ASYNC:
-                        JSONArray handle = new JSONArray();
-                        handle.put(jobId.getId());
-                        handle.put(metadataProvider.getResultSetId().getId());
-                        response.put("handle", handle);
-                        sessionConfig.out().print(response);
-                        sessionConfig.out().flush();
-                        hcc.waitForCompletion(jobId);
-                        break;
-                    case SYNC:
-                        hcc.waitForCompletion(jobId);
-                        ResultReader resultReader = new ResultReader(hdc);
-                        resultReader.open(jobId, metadataProvider.getResultSetId());
-                        ResultUtil.displayResults(resultReader, sessionConfig, stats,
-                                metadataProvider.findOutputRecordType());
-                        break;
-                    case ASYNC_DEFERRED:
-                        handle = new JSONArray();
-                        handle.put(jobId.getId());
-                        handle.put(metadataProvider.getResultSetId().getId());
-                        response.put("handle", handle);
-                        hcc.waitForCompletion(jobId);
-                        sessionConfig.out().print(response);
-                        sessionConfig.out().flush();
-                        break;
-                    default:
-                        break;
-
-                }
+                handleQueryResult(metadataProvider, hcc, hdc, compiled, resultDelivery, stats);
             }
         } catch (Exception e) {
             LOGGER.log(Level.INFO, e.getMessage(), e);
@@ -2606,6 +2586,47 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             // release external datasets' locks acquired during compilation of the query
             ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
+        return compiled;
+    }
+
+    private void handleQueryResult(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
+            IHyracksDataset hdc, JobSpecification compiled, ResultDelivery resultDelivery, Stats stats)
+                    throws Exception {
+        if (GlobalConfig.ASTERIX_LOGGER.isLoggable(Level.FINE)) {
+            GlobalConfig.ASTERIX_LOGGER.fine(compiled.toJSON().toString(1));
+        }
+        JobId jobId = JobUtils.runJob(hcc, compiled, false);
+
+        JSONObject response = new JSONObject();
+        switch (resultDelivery) {
+            case ASYNC:
+                JSONArray handle = new JSONArray();
+                handle.put(jobId.getId());
+                handle.put(metadataProvider.getResultSetId().getId());
+                response.put("handle", handle);
+                sessionConfig.out().print(response);
+                sessionConfig.out().flush();
+                hcc.waitForCompletion(jobId);
+                break;
+            case SYNC:
+                hcc.waitForCompletion(jobId);
+                ResultReader resultReader = new ResultReader(hdc);
+                resultReader.open(jobId, metadataProvider.getResultSetId());
+                ResultUtil.displayResults(resultReader, sessionConfig, stats, metadataProvider.findOutputRecordType());
+                break;
+            case ASYNC_DEFERRED:
+                handle = new JSONArray();
+                handle.put(jobId.getId());
+                handle.put(metadataProvider.getResultSetId().getId());
+                response.put("handle", handle);
+                hcc.waitForCompletion(jobId);
+                sessionConfig.out().print(response);
+                sessionConfig.out().flush();
+                break;
+            default:
+                break;
+
+        }
     }
 
     protected void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt)
@@ -2876,8 +2897,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 break;
             default:
                 throw new AlgebricksException(
-                        "The system \"" + runStmt.getSystem() +
-                                "\" specified in your run statement is not supported.");
+                        "The system \"" + runStmt.getSystem() + "\" specified in your run statement is not supported.");
         }
 
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index a548b3a..a806532 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -170,7 +170,7 @@ public class TestNodeController {
                 indexOpDesc, ctx, PARTITION, primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider,
                 op, true);
         CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(),
-                primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION);
+                primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, true);
         insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
         commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
         return insertOp;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
index 6e3f94c..2716859 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
@@ -30,6 +30,7 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.api.java.AsterixJavaClient;
+import org.apache.asterix.app.cc.CompilerExtensionManager;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -38,6 +39,7 @@ import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.IdentitiyResolverFactory;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.test.base.AsterixTestHelper;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.test.runtime.HDFSCluster;
@@ -66,15 +68,16 @@ public class OptimizerTest {
             + "optimizerts" + SEPARATOR;
     private static final String PATH_QUERIES = PATH_BASE + "queries" + SEPARATOR;
     private static final String PATH_EXPECTED = PATH_BASE + "results" + SEPARATOR;
-    private static final String PATH_ACTUAL = "target" + File.separator + "opttest" + SEPARATOR;
+    protected static final String PATH_ACTUAL = "target" + File.separator + "opttest" + SEPARATOR;
 
     private static final ArrayList<String> ignore = AsterixTestHelper.readFile(FILENAME_IGNORE, PATH_BASE);
     private static final ArrayList<String> only = AsterixTestHelper.readFile(FILENAME_ONLY, PATH_BASE);
-    private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    protected static String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
     private static final ILangCompilationProvider aqlCompilationProvider = new AqlCompilationProvider();
     private static final ILangCompilationProvider sqlppCompilationProvider = new SqlppCompilationProvider();
+    protected static ILangCompilationProvider extensionLangCompilationProvider = null;
 
-    private static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+    protected static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -171,9 +174,13 @@ public class OptimizerTest {
             PrintWriter plan = new PrintWriter(actualFile);
             ILangCompilationProvider provider = queryFile.getName().endsWith("aql") ? aqlCompilationProvider
                     : sqlppCompilationProvider;
+            if (extensionLangCompilationProvider != null) {
+                provider = extensionLangCompilationProvider;
+            }
             IHyracksClientConnection hcc = integrationUtil.getHyracksClientConnection();
             AsterixJavaClient asterix = new AsterixJavaClient(hcc, query, plan, provider,
-                    new DefaultStatementExecutorFactory(null));
+                    new DefaultStatementExecutorFactory(
+                            (CompilerExtensionManager) AsterixAppContextInfo.INSTANCE.getExtensionManager()));
             try {
                 asterix.compile(true, false, false, true, true, false, false);
             } catch (AsterixException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 39a4d3b..3929113 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -52,8 +52,12 @@ public class ExecutionTestUtil {
         return setUp(cleanup, TEST_CONFIG_FILE_NAME, integrationUtil, true);
     }
 
+    public static List<ILibraryManager> setUp(boolean cleanup, String configFile) throws Exception {
+        return setUp(cleanup, configFile, integrationUtil, true);
+    }
+
     public static List<ILibraryManager> setUp(boolean cleanup, String configFile,
-            AsterixHyracksIntegrationUtil integrationUtil, boolean startHdfs) throws Exception {
+            AsterixHyracksIntegrationUtil alternateIntegrationUtil, boolean startHdfs) throws Exception {
         System.out.println("Starting setup");
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting setup");
@@ -63,6 +67,7 @@ public class ExecutionTestUtil {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("initializing pseudo cluster");
         }
+        integrationUtil = alternateIntegrationUtil;
         integrationUtil.init(cleanup);
 
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -87,8 +92,8 @@ public class ExecutionTestUtil {
         libraryManagers.add(AsterixAppContextInfo.INSTANCE.getLibraryManager());
         // Adds library managers for NCs, one-per-NC.
         for (NodeControllerService nc : integrationUtil.ncs) {
-            IAsterixAppRuntimeContext runtimeCtx =
-                    (IAsterixAppRuntimeContext) nc.getApplicationContext().getApplicationObject();
+            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) nc.getApplicationContext()
+                    .getApplicationObject();
             libraryManagers.add(runtimeCtx.getLibraryManager());
         }
         return libraryManagers;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql
new file mode 100644
index 0000000..205edc2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: int,
+  message-text: string,
+  location:point
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid;
+
+
+insert into dataset TweetMessageuuids as $a(
+let $x :=
+[{ "tweetid":1,"message-text":"hello","location":create-point(6.0,6.0)},
+{"tweetid":2,"message-text":"goodbye","location":create-point(1.0,1.0)},
+{"tweetid":3,"message-text":"the end","location":create-point(6.0,3.0)},
+{"tweetid":4,"message-text":"what","location":create-point(3.0,6.0)},
+{"tweetid":5,"message-text":"good","location":create-point(5.0,6.0)}]
+for $y in $x
+return $y
+) returning
+let $x := create-circle($a.location,5.0)
+order by $a.tweetid
+return {
+  "x":$x,
+  "tweetid":$a.tweetid
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan
new file mode 100644
index 0000000..391b248
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan
@@ -0,0 +1,17 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- STABLE_SORT [$$16(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- COMMIT  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- INSERT_DELETE  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$16]  |PARTITIONED|
+                        -- ASSIGN  |UNPARTITIONED|
+                          -- STREAM_PROJECT  |UNPARTITIONED|
+                            -- UNNEST  |UNPARTITIONED|
+                              -- ASSIGN  |UNPARTITIONED|
+                                -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql
new file mode 100644
index 0000000..a75f9da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-return-records
+ * Description     : Check records returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: int,
+  message-text: string
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql
new file mode 100644
index 0000000..6fc7112
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-return-records
+ * Description     : Check records returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+ use dataverse test;
+
+insert into dataset TweetMessageuuids as $message (
+[{ "tweetid":1,"message-text":"hello"},
+{"tweetid":2,"message-text":"goodbye"},
+{"tweetid":3,"message-text":"the end"},
+{"tweetid":4,"message-text":"what"},
+{"tweetid":5,"message-text":"good"}]
+) returning $message;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql
new file mode 100644
index 0000000..9e77afb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-returning-fieldname
+ * Description     : Check fields returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  message-text: string
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql
new file mode 100644
index 0000000..6a7be26
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-returning-fieldname
+ * Description     : Check fields returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+use dataverse test;
+
+insert into dataset TweetMessageuuids as $message (
+{ "message-text":"hello"}
+) returning $message.message-text;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql
new file mode 100644
index 0000000..a75f9da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-return-records
+ * Description     : Check records returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: int,
+  message-text: string
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql
new file mode 100644
index 0000000..417860d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-with-bad-return
+ * Description     : Throw an error
+ * Expected Result : Error
+ * Date            : Oct 2016
+ */
+
+ use dataverse test;
+
+insert into dataset TweetMessageuuids as $message (
+[{ "tweetid":1,"message-text":"hello"},
+{"tweetid":2,"message-text":"goodbye"},
+{"tweetid":3,"message-text":"the end"},
+{"tweetid":4,"message-text":"what"},
+{"tweetid":5,"message-text":"good"}]
+) returning
+for $result in dataset TweetMessageuuids
+where $result.message-text = $message
+return $result;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql
new file mode 100644
index 0000000..50ff8ae
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : upsert-return-custom-result
+ * Description     : Check records returned on upsert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: int,
+  message-text: string,
+  location:point
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql
new file mode 100644
index 0000000..9abf667
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : upsert-return-custom-result
+ * Description     : Check records returned on upsert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+ use dataverse test;
+
+upsert into dataset TweetMessageuuids as $a (
+let $x :=
+[{ "tweetid":1,"message-text":"hello","location":create-point(6.0,6.0)},
+{"tweetid":2,"message-text":"goodbye","location":create-point(1.0,1.0)},
+{"tweetid":3,"message-text":"the end","location":create-point(6.0,3.0)},
+{"tweetid":4,"message-text":"what","location":create-point(3.0,6.0)},
+{"tweetid":5,"message-text":"good","location":create-point(5.0,6.0)}]
+for $y in $x
+return $y
+) returning
+let $x := create-circle($a.location,5.0)
+order by $a.tweetid
+return {
+  "x":$x,
+  "tweetid":$a.tweetid
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm
new file mode 100644
index 0000000..2ee9b1c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm
@@ -0,0 +1,5 @@
+{ "tweetid": 1, "message-text": "hello" }
+{ "tweetid": 2, "message-text": "goodbye" }
+{ "tweetid": 4, "message-text": "what" }
+{ "tweetid": 3, "message-text": "the end" }
+{ "tweetid": 5, "message-text": "good" }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm
new file mode 100644
index 0000000..84ed78b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm
@@ -0,0 +1 @@
+"hello"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm
new file mode 100644
index 0000000..8706beb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm
@@ -0,0 +1,5 @@
+{ "x": circle("6.0,6.0 5.0"), "tweetid": 1 }
+{ "x": circle("1.0,1.0 5.0"), "tweetid": 2 }
+{ "x": circle("3.0,6.0 5.0"), "tweetid": 4 }
+{ "x": circle("6.0,3.0 5.0"), "tweetid": 3 }
+{ "x": circle("5.0,6.0 5.0"), "tweetid": 5 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index 85884f8..d0e342c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -1699,6 +1699,27 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="dml">
+      <compilation-unit name="insert-return-records">
+        <output-dir compare="Text">insert-return-records</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
+      <compilation-unit name="insert-returning-fieldname">
+        <output-dir compare="Text">insert-returning-fieldname</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
+      <compilation-unit name="insert-with-bad-return">
+        <output-dir compare="Text">insert-with-bad-return</output-dir>
+        <expected-error>Error: Cannot use datasets in an insert returning query</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
+      <compilation-unit name="upsert-return-custom-result">
+        <output-dir compare="Text">upsert-return-custom-result</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
       <compilation-unit name="insert-src-dst-01">
         <output-dir compare="Text">insert-src-dst-01</output-dir>
       </compilation-unit>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-doc/src/site/markdown/aql/manual.md
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-doc/src/site/markdown/aql/manual.md b/asterixdb/asterix-doc/src/site/markdown/aql/manual.md
index 510bc83..393beec 100644
--- a/asterixdb/asterix-doc/src/site/markdown/aql/manual.md
+++ b/asterixdb/asterix-doc/src/site/markdown/aql/manual.md
@@ -824,7 +824,7 @@ data that has been prepared in ADM format.
 
 #### Insert
 
-    InsertStatement ::= "insert" "into" "dataset" QualifiedName Query
+    InsertStatement ::= "insert" "into" "dataset" QualifiedName ( "as" Variable )? Query ( "returning" Query )?
 
 The AQL insert statement is used to insert data into a dataset.
 The data to be inserted comes from an AQL query expression.
@@ -834,13 +834,17 @@ being the insertion of a single object plus its affiliated secondary index entri
 If the query part of an insert returns a single object, then the insert statement itself will
 be a single, atomic transaction.
 If the query part returns multiple objects, then each object inserted will be handled independently
-as a tranaction. If a dataset has an auto-generated primary key field, an insert statement should not include a value for that field in it. (The system will automatically extend the provided record with this additional field and a corresponding value.)
+as a tranaction. If a dataset has an auto-generated primary key field, an insert statement should not include a value for that field in it. (The system will automatically extend the provided record with this additional field and a corresponding value.).
+The optional "as Variable" provides a variable binding for the inserted records, which can be used in the "returning" clause.
+The optional "returning Query" allows users to run simple queries/functions on the records returned by the insert.
+This query cannot refer to any datasets.
+
 
 The following example illustrates a query-based insertion.
 
 ##### Example
 
-    insert into dataset UsersCopy (for $user in dataset FacebookUsers return $user)
+    insert into dataset UsersCopy as $inserted (for $user in dataset FacebookUsers return $user ) returning $inserted.screen-name
 
 #### Delete
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java
new file mode 100644
index 0000000..be9a245
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+
+public interface IActiveLifecycleEventSubscriber {
+
+    public enum ActiveLifecycleEvent {
+        FEED_INTAKE_STARTED,
+        FEED_COLLECT_STARTED,
+        FEED_INTAKE_FAILURE,
+        FEED_COLLECT_FAILURE,
+        FEED_INTAKE_ENDED,
+        FEED_COLLECT_ENDED,
+        ACTIVE_JOB_STARTED,
+        ACTIVE_JOB_ENDED,
+        ACTIVE_JOB_FAILED
+    }
+
+    public void assertEvent(ActiveLifecycleEvent event) throws AsterixException, InterruptedException;
+
+    public void handleEvent(ActiveLifecycleEvent event);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
deleted file mode 100644
index ad3c1c9..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-
-public interface IFeedLifecycleEventSubscriber {
-
-    public enum FeedLifecycleEvent {
-        FEED_INTAKE_STARTED,
-        FEED_COLLECT_STARTED,
-        FEED_INTAKE_FAILURE,
-        FEED_COLLECT_FAILURE,
-        FEED_INTAKE_ENDED,
-        FEED_COLLECT_ENDED
-    }
-
-    public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException;
-
-    public void handleFeedEvent(FeedLifecycleEvent event);
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java
new file mode 100644
index 0000000..7f8efc5
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+
+public class ActiveLifecycleEventSubscriber implements IActiveLifecycleEventSubscriber {
+
+    private LinkedBlockingQueue<ActiveLifecycleEvent> inbox;
+
+    public ActiveLifecycleEventSubscriber() {
+        this.inbox = new LinkedBlockingQueue<>();
+    }
+
+    @Override
+    public void handleEvent(ActiveLifecycleEvent event) {
+        inbox.add(event);
+    }
+
+    @Override
+    public void assertEvent(ActiveLifecycleEvent event) throws AsterixException, InterruptedException {
+        boolean eventOccurred = false;
+        ActiveLifecycleEvent e;
+        Iterator<ActiveLifecycleEvent> eventsSoFar = inbox.iterator();
+        while (eventsSoFar.hasNext()) {
+            e = eventsSoFar.next();
+            assertNoFailure(e);
+            eventOccurred = e.equals(event);
+        }
+
+        while (!eventOccurred) {
+            e = inbox.take();
+            eventOccurred = e.equals(event);
+            if (!eventOccurred) {
+                assertNoFailure(e);
+            }
+        }
+    }
+
+    private void assertNoFailure(ActiveLifecycleEvent e) throws AsterixException {
+        if (e.equals(ActiveLifecycleEvent.FEED_INTAKE_FAILURE) || e.equals(ActiveLifecycleEvent.FEED_COLLECT_FAILURE)
+                || e.equals(ActiveLifecycleEvent.ACTIVE_JOB_FAILED)) {
+            throw new AsterixException("Failure in active job.");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 0c49c92..02d2c8b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -34,8 +34,8 @@ import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.external.feed.api.FeedOperationCounter;
 import org.apache.asterix.external.feed.api.IFeedJoint;
 import org.apache.asterix.external.feed.api.IFeedJoint.State;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
 import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
@@ -63,7 +63,7 @@ import org.apache.log4j.Logger;
 public class FeedEventsListener implements IActiveEntityEventsListener {
     private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class);
     private final Map<EntityId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline;
-    private final List<IFeedLifecycleEventSubscriber> subscribers;
+    private final List<IActiveLifecycleEventSubscriber> subscribers;
     private final Map<Long, ActiveJob> jobs;
     private final Map<Long, ActiveJob> intakeJobs;
     private final Map<EntityId, FeedIntakeInfo> entity2Intake;
@@ -145,7 +145,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
             case FEED_CONNECT:
                 ((FeedConnectJobInfo) jobInfo).partitionStart();
                 if (((FeedConnectJobInfo) jobInfo).collectionStarted()) {
-                    notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_COLLECT_STARTED);
+                    notifyFeedEventSubscribers(ActiveLifecycleEvent.FEED_COLLECT_STARTED);
                 }
                 break;
             case INTAKE:
@@ -161,7 +161,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
         if (feedPipeline.get(message.getEntityId()).first.decrementAndGet() == 0) {
             ((FeedIntakeInfo) jobInfo).getIntakeFeedJoint().setState(State.ACTIVE);
             jobInfo.setState(ActivityState.ACTIVE);
-            notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+            notifyFeedEventSubscribers(ActiveLifecycleEvent.FEED_INTAKE_STARTED);
         }
     }
 
@@ -339,10 +339,10 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
         return locations;
     }
 
-    private synchronized void notifyFeedEventSubscribers(FeedLifecycleEvent event) {
+    private synchronized void notifyFeedEventSubscribers(ActiveLifecycleEvent event) {
         if (subscribers != null && !subscribers.isEmpty()) {
-            for (IFeedLifecycleEventSubscriber subscriber : subscribers) {
-                subscriber.handleFeedEvent(event);
+            for (IActiveLifecycleEventSubscriber subscriber : subscribers) {
+                subscriber.handleEvent(event);
             }
         }
     }
@@ -362,8 +362,8 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
         // notify event listeners
         feedPipeline.remove(feedId);
         entity2Intake.remove(feedId);
-        notifyFeedEventSubscribers(pair.first.isFailedIngestion() ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
-                : FeedLifecycleEvent.FEED_INTAKE_ENDED);
+        notifyFeedEventSubscribers(pair.first.isFailedIngestion() ? ActiveLifecycleEvent.FEED_INTAKE_FAILURE
+                : ActiveLifecycleEvent.FEED_INTAKE_ENDED);
     }
 
     private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
@@ -389,8 +389,8 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
         connectJobInfos.remove(connectionId);
         jobs.remove(cInfo.getJobId().getId());
         // notify event listeners
-        FeedLifecycleEvent event =
-                failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_COLLECT_ENDED;
+        ActiveLifecycleEvent event =
+                failure ? ActiveLifecycleEvent.FEED_COLLECT_FAILURE : ActiveLifecycleEvent.FEED_COLLECT_ENDED;
         notifyFeedEventSubscribers(event);
     }
 
@@ -569,16 +569,16 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
 
     }
 
-    public synchronized void registerFeedEventSubscriber(IFeedLifecycleEventSubscriber subscriber) {
+    public synchronized void registerFeedEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
         subscribers.add(subscriber);
     }
 
-    public void deregisterFeedEventSubscriber(IFeedLifecycleEventSubscriber subscriber) {
+    public void deregisterFeedEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
         subscribers.remove(subscriber);
     }
 
     public synchronized boolean isFeedConnectionActive(FeedConnectionId connectionId,
-            IFeedLifecycleEventSubscriber eventSubscriber) {
+            IActiveLifecycleEventSubscriber eventSubscriber) {
         boolean active = false;
         FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
         if (cInfo != null) {
@@ -643,7 +643,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
     }
 
     @Override
-    public boolean isEntityConnectedToDataset(String dataverseName, String datasetName) {
+    public boolean isEntityUsingDataset(String dataverseName, String datasetName) {
         return isConnectedToDataset(datasetName);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java
deleted file mode 100644
index 6e3cebce..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-
-public class FeedLifecycleEventSubscriber implements IFeedLifecycleEventSubscriber {
-
-    private LinkedBlockingQueue<FeedLifecycleEvent> inbox;
-
-    public FeedLifecycleEventSubscriber() {
-        this.inbox = new LinkedBlockingQueue<FeedLifecycleEvent>();
-    }
-
-    @Override
-    public void handleFeedEvent(FeedLifecycleEvent event) {
-        inbox.add(event);
-    }
-
-    @Override
-    public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException {
-        boolean eventOccurred = false;
-        FeedLifecycleEvent e = null;
-        Iterator<FeedLifecycleEvent> eventsSoFar = inbox.iterator();
-        while (eventsSoFar.hasNext()) {
-            e = eventsSoFar.next();
-            assertNoFailure(e);
-            eventOccurred = e.equals(event);
-        }
-
-        while (!eventOccurred) {
-            e = inbox.take();
-            eventOccurred = e.equals(event);
-            if (!eventOccurred) {
-                assertNoFailure(e);
-            }
-        }
-    }
-
-    private void assertNoFailure(FeedLifecycleEvent e) throws AsterixException {
-        if (e.equals(FeedLifecycleEvent.FEED_INTAKE_FAILURE) || e.equals(FeedLifecycleEvent.FEED_COLLECT_FAILURE)) {
-            throw new AsterixException("Failure in feed");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
deleted file mode 100644
index aac6676..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.external.feed.watch;
-
-import java.util.Map;
-
-public class FeedActivity implements Comparable<FeedActivity> {
-
-    private int activityId;
-
-    private final String dataverseName;
-    private final String datasetName;
-    private final String feedName;
-    private final Map<String, String> feedActivityDetails;
-
-    public static class FeedActivityDetails {
-        public static final String INTAKE_LOCATIONS = "intake-locations";
-        public static final String COMPUTE_LOCATIONS = "compute-locations";
-        public static final String STORAGE_LOCATIONS = "storage-locations";
-        public static final String COLLECT_LOCATIONS = "collect-locations";
-        public static final String FEED_POLICY_NAME = "feed-policy-name";
-        public static final String FEED_CONNECT_TIMESTAMP = "feed-connect-timestamp";
-    }
-
-    public FeedActivity(String dataverseName, String feedName, String datasetName,
-            Map<String, String> feedActivityDetails) {
-        this.dataverseName = dataverseName;
-        this.feedName = feedName;
-        this.datasetName = datasetName;
-        this.feedActivityDetails = feedActivityDetails;
-    }
-
-    public String getDataverseName() {
-        return dataverseName;
-    }
-
-    public String getDatasetName() {
-        return datasetName;
-    }
-
-    public String getFeedName() {
-        return feedName;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (this == other) {
-            return true;
-        }
-        if (!(other instanceof FeedActivity)) {
-            return false;
-        }
-
-        if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
-            return false;
-        }
-        if (!((FeedActivity) other).datasetName.equals(datasetName)) {
-            return false;
-        }
-        if (!((FeedActivity) other).getFeedName().equals(feedName)) {
-            return false;
-        }
-        if (((FeedActivity) other).getActivityId() != (activityId)) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return toString().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return dataverseName + "." + feedName + " --> " + datasetName + " " + activityId;
-    }
-
-    public String getConnectTimestamp() {
-        return feedActivityDetails.get(FeedActivityDetails.FEED_CONNECT_TIMESTAMP);
-    }
-
-    public int getActivityId() {
-        return activityId;
-    }
-
-    public void setActivityId(int activityId) {
-        this.activityId = activityId;
-    }
-
-    public Map<String, String> getFeedActivityDetails() {
-        return feedActivityDetails;
-    }
-
-    @Override
-    public int compareTo(FeedActivity o) {
-        return o.getActivityId() - this.activityId;
-    }
-
-}


Mime
View raw message