asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [2/2] asterixdb git commit: Support IFrameWriter contract check.
Date Sat, 10 Jun 2017 16:57:04 GMT
Support IFrameWriter contract check.

- add a instance-level flag for injecting operators to
  check IFrameWriter contract violations;
- check contract violations in runtime tests.

Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1618
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/8cf8be67
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/8cf8be67
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/8cf8be67

Branch: refs/heads/master
Commit: 8cf8be67c3e38f161eeaabefe9f84187de2236e3
Parents: 456cb9f
Author: Till Westmann <tillw@apache.org>
Authored: Fri Jun 9 20:15:33 2017 -0700
Committer: Yingyi Bu <buyingyi@gmail.com>
Committed: Sat Jun 10 09:56:35 2017 -0700

----------------------------------------------------------------------
 .../common/AsterixHyracksIntegrationUtil.java   |  12 +-
 .../asterix/app/translator/QueryTranslator.java |  70 ++++++-----
 .../app/bootstrap/TestNodeController.java       |   2 +-
 .../aql/translator/QueryTranslatorTest.java     |   8 ++
 .../apache/asterix/common/utils/JobUtils.java   |  10 +-
 .../operators/physical/SinkPOperator.java       |  10 +-
 .../runtime/base/EnforcePushRuntime.java        |  49 ++++++++
 .../algebricks/runtime/base/IPushRuntime.java   |  23 +++-
 .../aggreg/AggregateRuntimeFactory.java         |   1 +
 ...estedPlansAccumulatingAggregatorFactory.java |  13 +-
 .../NestedPlansRunningAggregatorFactory.java    |  21 +++-
 .../base/AbstractOneInputPushRuntime.java       |   2 +-
 .../base/AbstractOneInputSinkPushRuntime.java   |   2 +-
 .../base/AbstractOneInputSourcePushRuntime.java |  11 +-
 .../meta/AlgebricksMetaOperatorDescriptor.java  |  10 +-
 .../operators/meta/PipelineAssembler.java       |  14 ++-
 .../sort/InMemorySortRuntimeFactory.java        |   5 -
 .../std/EmptyTupleSourceRuntimeFactory.java     |   5 +
 .../std/NestedTupleSourceRuntimeFactory.java    |   5 -
 .../std/RunningAggregateRuntimeFactory.java     |   2 +-
 .../operators/std/SinkWriterRuntime.java        |   1 +
 .../std/StreamSelectRuntimeFactory.java         |  19 +--
 .../client/HyracksClientInterfaceFunctions.java |   2 +-
 .../hyracks/api/client/HyracksConnection.java   |  13 +-
 .../api/context/IHyracksTaskContext.java        |   4 +
 .../api/dataflow/EnforceFrameWriter.java        | 120 +++++++++++++++++++
 .../api/dataflow/IOperatorNodePushable.java     |  13 +-
 .../hyracks/api/exceptions/ErrorCode.java       |   8 ++
 .../org/apache/hyracks/api/job/JobFlag.java     |   3 +-
 .../SuperActivityOperatorNodePushable.java      |  16 ++-
 .../src/main/resources/errormsg/en.properties   |   9 +-
 .../apache/hyracks/control/cc/job/JobRun.java   |   5 +-
 .../hyracks/control/cc/work/JobStartWork.java   |   2 +-
 .../control/common/controllers/CCConfig.java    |  15 ++-
 .../control/common/controllers/NCConfig.java    |   9 +-
 .../org/apache/hyracks/control/nc/Task.java     |  14 ++-
 .../hyracks/control/nc/work/StartTasksWork.java |  55 +++++----
 .../preclustered/PreclusteredGroupWriter.java   |   3 +
 .../std/misc/NullSinkOperatorDescriptor.java    |  22 +---
 .../std/misc/SinkOperatorDescriptor.java        |  22 +---
 .../std/misc/SinkOperatorNodePushable.java      |  48 ++++++++
 .../std/sort/AbstractSortRunGenerator.java      |   3 +
 .../IndexSearchOperatorNodePushable.java        |  17 ++-
 .../hyracks/test/support/TestTaskContext.java   |   8 ++
 44 files changed, 504 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 9d01d63..ba98f73 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -25,7 +25,6 @@ import java.io.File;
 import java.io.IOException;
 import java.net.Inet4Address;
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -42,9 +41,6 @@ import org.apache.hyracks.api.application.ICCApplication;
 import org.apache.hyracks.api.application.INCApplication;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -131,6 +127,7 @@ public class AsterixHyracksIntegrationUtil {
         ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
         ccConfig.setResultTTL(120000L);
         ccConfig.setResultSweepThreshold(1000L);
+        ccConfig.setEnforceFrameWriterProtocol(true);
         configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb"));
         return ccConfig;
     }
@@ -215,13 +212,6 @@ public class AsterixHyracksIntegrationUtil {
         }
     }
 
-    public void runJob(JobSpecification spec) throws Exception {
-        GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString());
-        JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
-        GlobalConfig.ASTERIX_LOGGER.info(jobId.toString());
-        hcc.waitForCompletion(jobId);
-    }
-
     protected String getDefaultStoragePath() {
         return joinPath("target", "io", "dir");
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 80c05db..7ce9df6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -28,6 +28,7 @@ import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -191,8 +192,10 @@ import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.UnmanagedFileSplit;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
 /*
@@ -214,6 +217,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
     protected final IRewriterFactory rewriterFactory;
     protected final IStorageComponentProvider componentProvider;
     protected final ExecutorService executorService;
+    protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class);
 
     public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider,
@@ -228,6 +232,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         rewriterFactory = compliationProvider.getRewriterFactory();
         activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
         this.executorService = executorService;
+        if (appCtx.getServiceContext().getAppConfig().getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)) {
+            this.jobFlags.add(JobFlag.ENFORCE_CONTRACT);
+        }
     }
 
     public SessionOutput getSessionOutput() {
@@ -621,7 +628,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
 
                 // #. runJob
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
 
                 // #. begin new metadataTxn
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -653,7 +660,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     JobSpecification jobSpec = DatasetUtil.dropDatasetJobSpec(dataset, metadataProvider);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -900,7 +907,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                                 "Failed to create job spec for replicating Files Index For external dataset");
                     }
                     filesIndexReplicated = true;
-                    JobUtils.runJob(hcc, spec, true);
+                    runJob(hcc, spec);
                 }
             }
 
@@ -937,7 +944,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
             // #. create the index artifact in NC.
-            JobUtils.runJob(hcc, spec, true);
+            runJob(hcc, spec);
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
@@ -948,7 +955,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            JobUtils.runJob(hcc, spec, true);
+            runJob(hcc, spec);
 
             // #. begin new metadataTxn
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -986,7 +993,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                             ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1005,7 +1012,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     JobSpecification jobSpec = IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1189,7 +1196,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
             for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
             }
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1226,7 +1233,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
-                        JobUtils.runJob(hcc, jobSpec, true);
+                        runJob(hcc, jobSpec);
                     }
                 } catch (Exception e2) {
                     // do no throw exception since still the metadata needs to be compensated.
@@ -1405,7 +1412,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
                 for (JobSpecification jobSpec : jobsToExecute) {
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 }
 
                 // #. begin a new transaction
@@ -1466,7 +1473,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
                 for (JobSpecification jobSpec : jobsToExecute) {
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 }
 
                 // #. begin a new transaction
@@ -1496,7 +1503,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
-                        JobUtils.runJob(hcc, jobSpec, true);
+                        runJob(hcc, jobSpec);
                     }
                 } catch (Exception e2) {
                     // do no throw exception since still the metadata needs to be compensated.
@@ -1659,7 +1666,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
-                JobUtils.runJob(hcc, spec, true);
+                runJob(hcc, spec);
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -1725,7 +1732,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 if (jobSpec == null) {
                     return jobSpec;
                 }
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
             } finally {
                 locker.unlock();
             }
@@ -1753,7 +1760,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             bActiveTxn = false;
 
             if (jobSpec != null && !compileOnly) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
             }
             return jobSpec;
         } catch (Exception e) {
@@ -1935,7 +1942,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             } else {
                 JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
                         MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName()));
-                JobUtils.runJob(hcc, spec, true);
+                runJob(hcc, spec);
                 MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName);
             }
 
@@ -2022,6 +2029,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             activeEventHandler.registerListener(listener);
             IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STARTED);
             feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
+
+            // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
+            // We will need to design general exception handling mechanism for feeds.
             JobUtils.runJob(hcc, feedJob,
                     Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)));
             eventSubscriber.sync();
@@ -2211,7 +2221,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
 
             // #. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2300,14 +2310,14 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 }
                 break;
             case IMMEDIATE:
-                createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
+                createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
                     final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
                     ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
                             metadataProvider.findOutputRecordType());
                 }, clientContextId, ctx);
                 break;
             case DEFERRED:
-                createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
+                createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
                     ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId));
                     if (outMetadata != null) {
                         outMetadata.getResultSets()
@@ -2325,7 +2335,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             ResultSetId resultSetId, MutableBoolean printed) {
         Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
         try {
-            createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id -> {
+            createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> {
                 final ResultHandle handle = new ResultHandle(id, resultSetId);
                 ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.RUNNING);
                 ResultUtil.printResultHandle(sessionOutput, handle);
@@ -2353,16 +2363,20 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
-    private static void createAndRunJob(IHyracksClientConnection hcc, Mutable<JobId> jId, IStatementCompiler compiler,
-            IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, String clientContextId,
-            IStatementExecutorContext ctx) throws Exception {
+    private void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec) throws Exception {
+        JobUtils.runJob(hcc, jobSpec, jobFlags, true);
+    }
+
+    private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
+            IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
+            String clientContextId, IStatementExecutorContext ctx) throws Exception {
         locker.lock();
         try {
             final JobSpecification jobSpec = compiler.compile();
             if (jobSpec == null) {
                 return;
             }
-            final JobId jobId = JobUtils.runJob(hcc, jobSpec, false);
+            final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             if (ctx != null && clientContextId != null) {
                 ctx.put(clientContextId, jobId); // Adds the running job into the context.
             }
@@ -2507,14 +2521,14 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             transactionState = TransactionState.BEGIN;
 
             // run the files update job
-            JobUtils.runJob(hcc, spec, true);
+            runJob(hcc, spec);
 
             for (Index index : indexes) {
                 if (!ExternalIndexingOperations.isFileIndex(index)) {
                     spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, addedFiles,
                             appendedFiles, metadataProvider);
                     // run the files update job
-                    JobUtils.runJob(hcc, spec, true);
+                    runJob(hcc, spec);
                 }
             }
 
@@ -2533,7 +2547,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             bActiveTxn = false;
             transactionState = TransactionState.READY_TO_COMMIT;
             // We don't release the latch since this job is expected to be quick
-            JobUtils.runJob(hcc, spec, true);
+            runJob(hcc, spec);
             // Start a new metadata transaction to record the final state of the transaction
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2602,7 +2616,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 try {
-                    JobUtils.runJob(hcc, spec, true);
+                    runJob(hcc, spec);
                 } catch (Exception e2) {
                     // This should never happen -- fix throw illegal
                     e.addSuppressed(e2);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index f99793a..7d4b41d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -205,7 +205,7 @@ public class TestNodeController {
                 NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false);
         BTreeSearchOperatorNodePushable searchOp =
                 searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(), PARTITION, 1);
-        emptyTupleOp.setFrameWriter(0, searchOp,
+        emptyTupleOp.setOutputFrameWriter(0, searchOp,
                 primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0));
         searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc);
         return emptyTupleOp;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index 5e3da5f..b80f8f8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -40,6 +40,9 @@ import org.apache.asterix.lang.common.statement.RunStatement;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -59,6 +62,11 @@ public class QueryTranslatorTest {
         ExternalProperties mockAsterixExternalProperties = mock(ExternalProperties.class);
         when(mockAsterixAppContextInfo.getExternalProperties()).thenReturn(mockAsterixExternalProperties);
         when(mockAsterixExternalProperties.getAPIServerPort()).thenReturn(19002);
+        ICCServiceContext mockServiceContext = mock(ICCServiceContext.class);
+        when(mockAsterixAppContextInfo.getServiceContext()).thenReturn(mockServiceContext);
+        IApplicationConfig mockApplicationConfig = mock(IApplicationConfig.class);
+        when(mockServiceContext.getAppConfig()).thenReturn(mockApplicationConfig);
+        when(mockApplicationConfig.getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)).thenReturn(true);
 
         // Mocks AsterixClusterProperties.
         Cluster mockCluster = mock(Cluster.class);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index 41f9e67..cacbfbc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -19,7 +19,10 @@
 
 package org.apache.asterix.common.utils;
 
+import java.util.EnumSet;
+
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
@@ -32,8 +35,13 @@ public class JobUtils {
 
     public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
             throws Exception {
+        return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), waitForCompletion);
+    }
+
+    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags,
+            boolean waitForCompletion) throws Exception {
         spec.setMaxReattempts(0);
-        final JobId jobId = hcc.startJob(spec);
+        final JobId jobId = hcc.startJob(spec, jobFlags);
         if (waitForCompletion) {
             hcc.waitForCompletion(jobId);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index 71acecf..d0b7b47 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -21,28 +21,20 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 
 public class SinkPOperator extends AbstractPhysicalOperator {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java
new file mode 100644
index 0000000..26002e6
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.base;
+
+import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class EnforcePushRuntime extends EnforceFrameWriter implements IPushRuntime {
+
+    private final IPushRuntime pushRuntime;
+
+    private EnforcePushRuntime(IPushRuntime pushRuntime) {
+        super(pushRuntime);
+        this.pushRuntime = pushRuntime;
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        pushRuntime.setOutputFrameWriter(index, writer, recordDesc);
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        pushRuntime.setInputRecordDescriptor(index, recordDescriptor);
+    }
+
+    public static IPushRuntime enforce(IPushRuntime pushRuntime) {
+        return pushRuntime instanceof EnforcePushRuntime || pushRuntime instanceof NestedTupleSourceRuntime
+                ? pushRuntime : new EnforcePushRuntime(pushRuntime);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
index 27d6900..de00697 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
@@ -22,7 +22,26 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 
 public interface IPushRuntime extends IFrameWriter {
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
 
-    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
+    /**
+     * Sets the output frame writer for this writer.
+     *
+     * @param index,
+     *            the index of the output channel.
+     * @param writer,
+     *            the writer for writing output.
+     * @param recordDesc,
+     *            the output record descriptor.
+     */
+    void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+    /**
+     * Sets the input record descriptor for this writer.
+     *
+     * @param index,
+     *            the index of the input channel.
+     * @param recordDescriptor,
+     *            the corresponding input record descriptor.
+     */
+    void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index 42e5157..e99b61b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -124,6 +124,7 @@ public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFac
 
             @Override
             public void fail() throws HyracksDataException {
+                failed = true;
                 if (isOpen) {
                     writer.fail();
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index b397f23..893aa61 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.algebricks.runtime.operators.aggreg;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
@@ -29,6 +30,7 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
@@ -52,7 +54,6 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
-
         final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length, decorFieldIdx.length);
         final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
@@ -91,8 +92,8 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
             }
 
             @Override
-            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor,
+                    int tIndex, AggregateState state) throws HyracksDataException {
                 for (int i = 0; i < pipelines.length; i++) {
                     outputWriter.setInputIdx(i);
                     pipelines[i].close();
@@ -144,9 +145,13 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
         IFrameWriter start = writer;
         IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
         RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
             IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
-            newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+            start = enforce ? EnforcePushRuntime.enforce(start) : start;
+            newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
             if (i > 0) {
                 newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
             } else {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 52245e1..8b8e320 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.algebricks.runtime.operators.aggreg;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
@@ -28,8 +29,10 @@ import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -58,11 +61,14 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
     public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
             final IFrameWriter writer) throws HyracksDataException {
-        final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length,
-                decorFieldIdx.length, writer);
+        final RunningAggregatorOutput outputWriter =
+                new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, decorFieldIdx.length, writer);
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+        IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter;
         final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
-                pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
+            pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], enforcedWriter, ctx);
         }
 
         final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
@@ -136,13 +142,17 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
 
     private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
             throws HyracksDataException {
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         // plug the operators
         IFrameWriter start = writer;
         IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
         RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
             IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
-            newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+            start = enforce ? EnforceFrameWriter.enforce(start) : start;
+            newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
             if (i > 0) {
                 newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
             } else {
@@ -206,8 +216,9 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
                 int start = 0;
                 int offset = 0;
                 for (int i = 0; i < fieldEnds.length; i++) {
-                    if (i > 0)
+                    if (i > 0) {
                         start = fieldEnds[i - 1];
+                    }
                     offset = fieldEnds[i] - start;
                     tb.addField(data, start, offset);
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
index 33e7c73..5cced8d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -29,7 +29,7 @@ public abstract class AbstractOneInputPushRuntime implements IPushRuntime {
     protected boolean failed;
 
     @Override
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         this.writer = writer;
         this.outputRecordDesc = recordDesc;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
index e430461..d47199d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
@@ -26,7 +26,7 @@ public abstract class AbstractOneInputSinkPushRuntime implements IPushRuntime {
     protected RecordDescriptor inputRecordDesc;
 
     @Override
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         throw new IllegalStateException();
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
index b7707d4..35563e0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -27,20 +27,29 @@ public abstract class AbstractOneInputSourcePushRuntime extends AbstractOneInput
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        // nextFrame will never be called on this runtime
         throw new UnsupportedOperationException();
     }
 
     @Override
     public void close() throws HyracksDataException {
+        // close is a no op since this operator completes operating in open()
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        writer.fail();
+        // fail is a no op since if a failure happened, the operator would've already called fail() on downstream
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // flush will never be called on this runtime
+        throw new UnsupportedOperationException();
     }
 
     @Override
     public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        // setInputRecordDescriptor will never be called on this runtime since it has no input
         throw new UnsupportedOperationException();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 1294614..f6ebf19 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -112,6 +112,7 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
 
             private IFrameWriter startOfPipeline;
+            private boolean opened = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -124,6 +125,7 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper
                             pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
                     startOfPipeline = pa.assemblePipeline(writer, ctx);
                 }
+                opened = true;
                 startOfPipeline.open();
             }
 
@@ -134,12 +136,16 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper
 
             @Override
             public void close() throws HyracksDataException {
-                startOfPipeline.close();
+                if (opened) {
+                    startOfPipeline.close();
+                }
             }
 
             @Override
             public void fail() throws HyracksDataException {
-                startOfPipeline.fail();
+                if (opened) {
+                    startOfPipeline.fail();
+                }
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 03e2aaf..e1081e0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -19,11 +19,14 @@
 package org.apache.hyracks.algebricks.runtime.operators.meta;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
 
 public class PipelineAssembler {
 
@@ -44,18 +47,21 @@ public class PipelineAssembler {
         this.outputArity = outputArity;
     }
 
-    public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws
-            HyracksDataException {
+    public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         // plug the operators
         IFrameWriter start = writer;// this.writer;
         for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
             IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
+            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+            start = enforce ? EnforceFrameWriter.enforce(start) : start;
             if (i == pipeline.getRuntimeFactories().length - 1) {
                 if (outputArity == 1) {
-                    newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor);
+                    newRuntime.setOutputFrameWriter(0, start, pipelineOutputRecordDescriptor);
                 }
             } else {
-                newRuntime.setFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
+                newRuntime.setOutputFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
             }
             if (i > 0) {
                 newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 3e30f73..925ff93 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -79,11 +79,6 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime
             }
 
             @Override
-            public void fail() throws HyracksDataException {
-                writer.fail();
-            }
-
-            @Override
             public void close() throws HyracksDataException {
                 try {
                     frameSorter.sort();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 2b7c2da..3ccceed 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -56,6 +56,11 @@ public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
             }
 
             @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+
+            @Override
             public void close() throws HyracksDataException {
                 writer.close();
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index e123adf..496679f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -65,11 +65,6 @@ public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
         }
 
         @Override
-        public void fail() throws HyracksDataException {
-            writer.fail();
-        }
-
-        @Override
         public void flush() throws HyracksDataException {
             writer.flush();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 38fe7d1..33b7725 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -118,7 +118,7 @@ public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRun
             @Override
             public void fail() throws HyracksDataException {
                 if (isOpen) {
-                    super.fail();
+                    writer.fail();
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index ebf3d3a..55146e2 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -81,6 +81,7 @@ public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
 
     @Override
     public void fail() throws HyracksDataException {
+        // fail() is a no op. in close we will cleanup
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 0f57fd7..171544d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -85,7 +85,6 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
             private IScalarEvaluator eval;
             private IMissingWriter missingWriter = null;
             private ArrayTupleBuilder missingTupleBuilder = null;
-            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -93,7 +92,6 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
                     initAccessAppendFieldRef(ctx);
                     eval = cond.createScalarEvaluator(ctx);
                 }
-                isOpen = true;
                 writer.open();
 
                 //prepare nullTupleBuilder
@@ -107,20 +105,11 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
             }
 
             @Override
-            public void fail() throws HyracksDataException {
-                if (isOpen) {
-                    super.fail();
-                }
-            }
-
-            @Override
             public void close() throws HyracksDataException {
-                if (isOpen) {
-                    try {
-                        flushIfNotFailed();
-                    } finally {
-                        writer.close();
-                    }
+                try {
+                    flushIfNotFailed();
+                } finally {
+                    writer.close();
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index aa9232e..e2868ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -177,7 +177,7 @@ public class HyracksClientInterfaceFunctions {
         }
 
         public StartJobFunction(JobId jobId) {
-            this(null, null, null, jobId);
+            this(null, null, EnumSet.noneOf(JobFlag.class), jobId);
         }
 
         public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index ad54110..75cbf61 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -102,14 +102,14 @@ public final class HyracksConnection implements IHyracksClientConnection {
 
     @Override
     public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
-                jobSpec);
+        IActivityClusterGraphGeneratorFactory jsacggf =
+                new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
         return startJob(jsacggf, jobFlags);
     }
 
     @Override
     public JobId distributeJob(JobSpecification jobSpec) throws Exception {
-        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
+        IActivityClusterGraphGeneratorFactory jsacggf =
                 new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
         return distributeJob(jsacggf);
     }
@@ -212,15 +212,14 @@ public final class HyracksConnection implements IHyracksClientConnection {
     @Override
     public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
             throws Exception {
-        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
-                jobSpec);
+        IActivityClusterGraphGeneratorFactory jsacggf =
+                new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
         return startJob(deploymentId, jsacggf, jobFlags);
     }
 
     @Override
     public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
-            EnumSet<JobFlag> jobFlags)
-            throws Exception {
+            EnumSet<JobFlag> jobFlags) throws Exception {
         return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index c8e4cf8..df693b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.api.context;
 
 import java.io.Serializable;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
@@ -26,6 +27,7 @@ import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.IOperatorEnvironment;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatableRegistry;
 
@@ -48,4 +50,6 @@ public interface IHyracksTaskContext
     void setSharedObject(Object object);
 
     Object getSharedObject();
+
+    Set<JobFlag> getJobFlags();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
new file mode 100644
index 0000000..bf54e01
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class EnforceFrameWriter implements IFrameWriter {
+
+    // The downstream data consumer of this writer.
+    private final IFrameWriter writer;
+
+    // A flag that indicates whether the data consumer of this writer has failed.
+    private boolean downstreamFailed = false;
+
+    // A flag that indicates whether the the data producer of this writer has called fail() for this writer.
+    // There could be two cases:
+    // CASE 1: the downstream of this writer fails and the exception is propagated to the source operator, which
+    //         cascades to the fail() of this writer;
+    // CASE 2: the failure happens in the upstream of this writer and the source operator cascades to the fail()
+    //         of this writer.
+    private boolean failCalledByUpstream = false;
+
+    // A flag that indicates whether the downstream of this writer is open.
+    private boolean downstreamOpen = false;
+
+    protected EnforceFrameWriter(IFrameWriter writer) {
+        this.writer = writer;
+    }
+
+    @Override
+    public final void open() throws HyracksDataException {
+        try {
+            if (downstreamOpen) {
+                throw HyracksDataException.create(ErrorCode.OPEN_ON_OPEN_WRITER);
+            }
+            if (downstreamFailed || failCalledByUpstream) {
+                throw HyracksDataException.create(ErrorCode.OPEN_ON_FAILED_WRITER);
+            }
+            writer.open();
+            downstreamOpen = true;
+        } catch (Throwable th) {
+            downstreamFailed = true;
+            throw th;
+        }
+    }
+
+    @Override
+    public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (!downstreamOpen) {
+            throw HyracksDataException.create(ErrorCode.NEXT_FRAME_ON_CLOSED_WRITER);
+        }
+        if (downstreamFailed || failCalledByUpstream) {
+            throw HyracksDataException.create(ErrorCode.NEXT_FRAME_ON_FAILED_WRITER);
+        }
+        try {
+            writer.nextFrame(buffer);
+        } catch (Throwable th) {
+            downstreamFailed = true;
+            throw th;
+        }
+    }
+
+    @Override
+    public final void flush() throws HyracksDataException {
+        if (!downstreamOpen) {
+            throw HyracksDataException.create(ErrorCode.FLUSH_ON_CLOSED_WRITER);
+        }
+        if (downstreamFailed || failCalledByUpstream) {
+            throw HyracksDataException.create(ErrorCode.FLUSH_ON_FAILED_WRITER);
+        }
+        try {
+            writer.flush();
+        } catch (Throwable th) {
+            downstreamFailed = true;
+            throw th;
+        }
+    }
+
+    @Override
+    public final void fail() throws HyracksDataException {
+        writer.fail();
+        if (failCalledByUpstream) {
+            throw HyracksDataException.create(ErrorCode.FAIL_ON_FAILED_WRITER);
+        }
+        failCalledByUpstream = true;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        writer.close();
+        downstreamOpen = false;
+        if (downstreamFailed && !failCalledByUpstream) {
+            throw HyracksDataException.create(ErrorCode.MISSED_FAIL_CALL);
+        }
+    }
+
+    public static IFrameWriter enforce(IFrameWriter writer) {
+        return writer instanceof EnforceFrameWriter ? writer : new EnforceFrameWriter(writer);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
index f6c201e..82433e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
@@ -23,16 +23,15 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IOperatorNodePushable {
-    public void initialize() throws HyracksDataException;
+    void initialize() throws HyracksDataException;
 
-    public void deinitialize() throws HyracksDataException;
+    void deinitialize() throws HyracksDataException;
 
-    public int getInputArity();
+    int getInputArity();
 
-    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
-            throws HyracksDataException;
+    void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) throws HyracksDataException;
 
-    public IFrameWriter getInputFrameWriter(int index);
+    IFrameWriter getInputFrameWriter(int index);
 
-    public String getDisplayName();
+    String getDisplayName();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index b52a6a5..8f36fcd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -92,6 +92,14 @@ public class ErrorCode {
     public static final int DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX = 56;
     public static final int CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE = 57;
     public static final int TASK_ABORTED = 58;
+    public static final int OPEN_ON_OPEN_WRITER = 59;
+    public static final int OPEN_ON_FAILED_WRITER = 60;
+    public static final int NEXT_FRAME_ON_FAILED_WRITER = 61;
+    public static final int NEXT_FRAME_ON_CLOSED_WRITER = 62;
+    public static final int FLUSH_ON_FAILED_WRITER = 63;
+    public static final int FLUSH_ON_CLOSED_WRITER = 64;
+    public static final int FAIL_ON_FAILED_WRITER = 65;
+    public static final int MISSED_FAIL_CALL = 66;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index a33c6c9..7225cd4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -19,5 +19,6 @@
 package org.apache.hyracks.api.job;
 
 public enum JobFlag {
-    PROFILE_RUNTIME
+    PROFILE_RUNTIME,
+    ENFORCE_CONTRACT
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 314bf8b..7fdf106 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -35,12 +35,14 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
 
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
@@ -90,18 +92,18 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable
     private void init() throws HyracksDataException {
         Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
         List<IConnectorDescriptor> outputConnectors;
-
+        final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         /*
          * Set up the source operators
          */
         for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
-            IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
-                    nPartitions);
+            IOperatorNodePushable opPushable =
+                    entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
             operatorNodePushablesBFSOrder.add(opPushable);
             operatorNodePushables.put(entry.getKey(), opPushable);
             inputArity += opPushable.getInputArity();
-            outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(),
-                    Collections.emptyList());
+            outputConnectors =
+                    MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), Collections.emptyList());
             for (IConnectorDescriptor conn : outputConnectors) {
                 childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
             }
@@ -131,7 +133,9 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable
             /*
              * construct the dataflow connection from a producer to a consumer
              */
-            sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel),
+            IFrameWriter writer = destOp.getInputFrameWriter(inputChannel);
+            writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
+            sourceOp.setOutputFrameWriter(outputChannel, writer,
                     recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
 
             /*

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 35a2fc5..4bf069c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -75,6 +75,13 @@
 56 = LSM disk component scan is not allowed for a secondary index
 57 = Couldn't find the matter tuple for anti-matter tuple in the primary index
 58 = Task %1$s was aborted
+59 = Data pipeline protocol violation: open() is called on a opened writer
+60 = Data pipeline protocol violation: open() is called on a failed writer
+61 = Data pipeline protocol violation: nextFrame() is called on a failed writer
+62 = Data pipeline protocol violation: nextFrame() is called on a closed writer
+63 = Data pipeline protocol violation: flush() is called on a failed writer
+64 = Data pipeline protocol violation: flush() is called on a closed writer
+65 = Data pipeline protocol violation: fail() is called twice on a writer
+66 = Data pipeline protocol violation: fail() is not called by the upstream when there is a failure in the downstream
 
-# 10000 ---- 19999: compilation errors
 10000 = The given rule collection %1$s is not an instance of the List class.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 55a7a82..95a6d9b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,7 +20,6 @@ package org.apache.hyracks.control.cc.job;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -116,10 +115,10 @@ public class JobRun implements IJobStatusConditionVariable {
     }
 
     //Run a Pre-distributed job by passing the JobId
-    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
+    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags,
             PreDistributedJobDescriptor distributedJobDescriptor)
             throws HyracksException {
-        this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class),
+        this(deploymentId, jobId, jobFlags,
                 distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
         Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
         this.scheduler = new JobExecutor(ccs, this, constaints, true);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index 2dbb631..e083d2a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -68,7 +68,7 @@ public class JobStartWork extends SynchronizableWork {
                 run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
             } else {
                 //ActivityClusterGraph has already been distributed
-                run = new JobRun(ccs, deploymentId, jobId,
+                run = new JobRun(ccs, deploymentId, jobId, jobFlags,
                         ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
             }
             jobManager.add(run);


Mime
View raw message