asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjaco...@apache.org
Subject [3/3] asterixdb git commit: [ASTERIXDB-1911][HYR, RT, CLUS] Fixes and Improvements for Deployed Jobs
Date Wed, 15 Nov 2017 01:03:06 GMT
[ASTERIXDB-1911][HYR,RT,CLUS] Fixes and Improvements for Deployed Jobs

Rename Predistributed Jobs to Deployed Jobs
Enable job executions to have a map of job parameters
Add an Asterix function to retrieve these parameters
which are can be assigned when the job is run, e.g. for Deployed jobs
Allow Deployed jobs to have new TxnIds and JobIds for each execution
Allow simultaneous execution of one Deployed Job

Change-Id: I8f493c1fa977d07dfe8a875f9ebe9515d01d1473
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2045
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Xikui Wang <xkkwww@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e6f426b8
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e6f426b8
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e6f426b8

Branch: refs/heads/master
Commit: e6f426b8019957dd15962926788edbe655218cb9
Parents: 592af65
Author: Steven Glenn Jacobs <sjaco002@ucr.edu>
Authored: Tue Nov 14 13:56:19 2017 -0800
Committer: Steven Jacobs <sjaco002@ucr.edu>
Committed: Tue Nov 14 17:02:17 2017 -0800

----------------------------------------------------------------------
 asterixdb/asterix-active/pom.xml                |   5 +
 .../asterix/active/DeployedJobService.java      | 108 ++++++++++
 .../asterix/app/translator/QueryTranslator.java |  10 +-
 .../app/bootstrap/TestNodeController.java       |   3 +
 .../common/api/IJobEventListenerFactory.java    |  30 +++
 .../statement/CreateFunctionStatement.java      |   6 +-
 .../lang/common/visitor/FormatPrintVisitor.java |   7 +-
 .../asterix/metadata/utils/DatasetUtil.java     |   3 +-
 .../asterix/om/functions/BuiltinFunctions.java  |   6 +
 .../AbstractUnaryStringStringEval.java          |   3 +-
 .../GetJobParameterByNameDescriptor.java        |  77 +++++++
 .../runtime/functions/FunctionCollection.java   |   8 +-
 .../job/listener/JobEventListenerFactory.java   |  30 ++-
 ...tiTransactionJobletEventListenerFactory.java |  20 +-
 .../LockThenSearchOperationCallbackFactory.java |   6 +-
 ...exInstantSearchOperationCallbackFactory.java |   6 +-
 ...dexModificationOperationCallbackFactory.java |   6 +-
 ...maryIndexSearchOperationCallbackFactory.java |   6 +-
 ...dexModificationOperationCallbackFactory.java |   6 +-
 ...dexModificationOperationCallbackFactory.java |   6 +-
 ...dexModificationOperationCallbackFactory.java |   6 +-
 .../UpsertOperationCallbackFactory.java         |   6 +-
 .../runtime/CommitRuntimeFactory.java           |   8 +-
 .../client/HyracksClientInterfaceFunctions.java |  52 +++--
 .../HyracksClientInterfaceRemoteProxy.java      |  21 +-
 .../hyracks/api/client/HyracksConnection.java   |  19 +-
 .../api/client/IHyracksClientConnection.java    |  22 +-
 .../api/client/IHyracksClientInterface.java     |   7 +-
 .../impl/ActivityClusterGraphBuilder.java       |   6 +-
 ...ionActivityClusterGraphGeneratorFactory.java |   7 +-
 .../api/context/IHyracksJobletContext.java      |   3 +
 .../api/context/IHyracksTaskContext.java        |   3 +
 .../hyracks/api/exceptions/ErrorCode.java       |   6 +-
 .../hyracks/api/job/ActivityClusterId.java      |  19 +-
 .../hyracks/api/job/DeployedJobSpecId.java      |  91 ++++++++
 .../api/job/DeployedJobSpecIdFactory.java       |  34 +++
 .../IActivityClusterGraphGeneratorFactory.java  |   2 +-
 .../api/job/IJobletEventListenerFactory.java    |   7 +-
 .../hyracks/api/job/JobParameterByteStore.java  |  64 ++++++
 .../hyracks/control/cc/ClientInterfaceIPCI.java |  36 ++--
 .../control/cc/ClusterControllerIPCI.java       |  61 +++---
 .../control/cc/ClusterControllerService.java    |  36 +++-
 .../control/cc/DeployedJobSpecStore.java        | 102 +++++++++
 .../control/cc/PreDistributedJobStore.java      | 104 ----------
 .../cc/dataset/DatasetDirectoryService.java     |  12 +-
 .../control/cc/executor/JobExecutor.java        |  16 +-
 .../hyracks/control/cc/job/JobManager.java      |   5 +-
 .../apache/hyracks/control/cc/job/JobRun.java   |  17 +-
 .../control/cc/work/DeployJobSpecWork.java      |  76 +++++++
 .../control/cc/work/DeployedJobFailureWork.java |  39 ++++
 .../hyracks/control/cc/work/DestroyJobWork.java |  52 -----
 .../control/cc/work/DistributeJobWork.java      |  80 -------
 .../cc/work/DistributedJobFailureWork.java      |  39 ----
 .../hyracks/control/cc/work/JobStartWork.java   |  26 ++-
 .../control/cc/work/UndeployJobSpecWork.java    |  53 +++++
 .../control/common/base/IClusterController.java |   3 +-
 .../control/common/base/INodeController.java    |   9 +-
 .../control/common/ipc/CCNCFunctions.java       | 100 +++++++--
 .../ipc/ClusterControllerRemoteProxy.java       |  26 ++-
 .../common/ipc/NodeControllerRemoteProxy.java   |  28 ++-
 .../org/apache/hyracks/control/nc/Joblet.java   |  16 +-
 .../hyracks/control/nc/NodeControllerIPCI.java  |  18 +-
 .../control/nc/NodeControllerService.java       |  50 +++--
 .../org/apache/hyracks/control/nc/Task.java     |   3 +
 .../control/nc/work/CleanupJobletWork.java      |   1 +
 .../control/nc/work/DeployJobSpecWork.java      |  62 ++++++
 .../hyracks/control/nc/work/DestroyJobWork.java |  54 -----
 .../control/nc/work/DistributeJobWork.java      |  63 ------
 .../hyracks/control/nc/work/StartTasksWork.java |  33 ++-
 .../control/nc/work/UndeployJobSpecWork.java    |  54 +++++
 .../tests/integration/DeployedJobSpecsTest.java | 206 +++++++++++++++++++
 .../integration/PredistributedJobsTest.java     | 186 -----------------
 .../hyracks/test/support/TestJobletContext.java |   8 +-
 .../hyracks/test/support/TestTaskContext.java   |   4 +
 74 files changed, 1554 insertions(+), 859 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-active/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 5865cd1..6b0c381 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -31,6 +31,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-transactions</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
new file mode 100644
index 0000000..070d838
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.time.Instant;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobId;
+
+/**
+ * Provides functionality for running DeployedJobSpecs
+ */
+public class DeployedJobService {
+
+    private static final Logger LOGGER = Logger.getLogger(DeployedJobService.class.getName());
+
+    //To enable new Asterix TxnId for separate deployed job spec invocations
+    private static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
+
+    //pool size one (only running one thread at a time)
+    private static final int POOL_SIZE = 1;
+
+    //Starts running a deployed job specification periodically with an interval of "duration" seconds
+    public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
+            IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> jobParameters, EntityId entityId) {
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE);
+        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    if (!runRepetitiveDeployedJobSpec(distributedId, hcc, jobParameters, duration, entityId)) {
+                        scheduledExecutorService.shutdown();
+                    }
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Job Failed to run for " + entityId.getExtensionName() + " "
+                            + entityId.getDataverse() + "." + entityId.getEntityName() + ".", e);
+                }
+            }
+        }, duration, duration, TimeUnit.MILLISECONDS);
+        return scheduledExecutorService;
+    }
+
+    public static boolean runRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
+            Map<byte[], byte[]> jobParameters, long duration, EntityId entityId) throws Exception {
+        long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, jobParameters, entityId);
+        if (executionMilliseconds > duration && LOGGER.isLoggable(Level.SEVERE)) {
+            LOGGER.log(Level.SEVERE,
+                    "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+                            + entityId.getEntityName() + " was unable to meet the required period of " + duration
+                            + " milliseconds. Actually took " + executionMilliseconds + " execution will shutdown"
+                            + new Date());
+            return false;
+        }
+        return true;
+    }
+
+    public synchronized static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
+            Map<byte[], byte[]> jobParameters, EntityId entityId) throws Exception {
+        JobId jobId;
+        long startTime = Instant.now().toEpochMilli();
+
+        //Add the Asterix Transaction Id to the map
+        jobParameters.put(TRANSACTION_ID_PARAMETER_NAME, String.valueOf(TxnIdFactory.create().getId()).getBytes());
+        jobId = hcc.startJob(distributedId, jobParameters);
+
+        hcc.waitForCompletion(jobId);
+        long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
+
+        LOGGER.log(Level.INFO,
+                "Deployed Job execution completed for " + entityId.getExtensionName() + " " + entityId.getDataverse()
+                        + "."
+                        + entityId.getEntityName() + ". Took " + executionMilliseconds + " milliseconds ");
+
+        return executionMilliseconds;
+
+    }
+
+    @Override
+    public String toString() {
+        return "DeployedJobSpecService";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index a811454..a52d765 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -744,7 +744,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         return DatasetUtil.createNodeGroupForNewDataset(dataverseName, datasetName, selectedNodes, metadataProvider);
     }
 
-    protected void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
+    public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
         String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName());
@@ -1672,9 +1672,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
 
     protected void handleCreateFunctionStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
         CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
-        String dataverse = getActiveDataverseName(cfs.getSignature().getNamespace());
-        cfs.getSignature().setNamespace(dataverse);
-        String functionName = cfs.getaAterixFunction().getName();
+        String dataverse = getActiveDataverseName(cfs.getFunctionSignature().getNamespace());
+        cfs.getFunctionSignature().setNamespace(dataverse);
+        String functionName = cfs.getFunctionSignature().getName();
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1685,7 +1685,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             if (dv == null) {
                 throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
             }
-            Function function = new Function(dataverse, functionName, cfs.getaAterixFunction().getArity(),
+            Function function = new Function(dataverse, functionName, cfs.getFunctionSignature().getArity(),
                     cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
                     rewriterFactory instanceof SqlppRewriterFactory ? Function.LANGUAGE_SQLPP : Function.LANGUAGE_AQL,
                     FunctionKind.SCALAR.toString());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index df8eef7..90b9a1e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -58,6 +58,7 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
@@ -298,6 +299,8 @@ public class TestNodeController {
         }
         JobId jobId = newJobId();
         IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class);
+        JobEventListenerFactory factory = new JobEventListenerFactory(new TxnId(jobId.getId()), true);
+        Mockito.when(jobletCtx.getJobletEventListenerFactory()).thenReturn(factory);
         Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
         Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
         ctx = Mockito.spy(ctx);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
new file mode 100644
index 0000000..0f37b13
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+
+/**
+ * an interface for JobEventListenerFactories to add Asterix transaction JobId getter
+ */
+public interface IJobEventListenerFactory extends IJobletEventListenerFactory {
+
+    TxnId getTxnId(TxnId compiledTxnId);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
index 6d74957..84d66ce 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
@@ -34,10 +34,6 @@ public class CreateFunctionStatement implements Statement {
     private final boolean ifNotExists;
     private final List<String> paramList;
 
-    public FunctionSignature getaAterixFunction() {
-        return signature;
-    }
-
     public String getFunctionBody() {
         return functionBody;
     }
@@ -66,7 +62,7 @@ public class CreateFunctionStatement implements Statement {
         return paramList;
     }
 
-    public FunctionSignature getSignature() {
+    public FunctionSignature getFunctionSignature() {
         return signature;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index a2e7341..c74ee5d 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -232,7 +232,7 @@ public class FormatPrintVisitor implements ILangVisitor<Void, Integer> {
             out.print("(");
             exprList.get(0).accept(this, step + 1);
             for (int i = 1; i < exprList.size(); i++) {
-                OperatorType opType = opList.get(i - 1);;
+                OperatorType opType = opList.get(i - 1);
                 if (i == 1) {
                     printHints(operatorExpr.getHints(), step + 1);
                 }
@@ -657,7 +657,7 @@ public class FormatPrintVisitor implements ILangVisitor<Void, Integer> {
         out.print(skip(step) + CREATE + " index ");
         out.print(normalize(cis.getIndexName().getValue()) + " ");
         out.print(generateIfNotExists(cis.getIfNotExists()));
-        out.print(" on ");;
+        out.print(" on ");
         out.print(generateFullName(cis.getDataverseName(), cis.getDatasetName()));
 
         out.print(" (");
@@ -827,7 +827,8 @@ public class FormatPrintVisitor implements ILangVisitor<Void, Integer> {
     public Void visit(CreateFunctionStatement cfs, Integer step) throws CompilationException {
         out.print(skip(step) + CREATE + " function ");
         out.print(generateIfNotExists(cfs.getIfNotExists()));
-        out.print(this.generateFullName(cfs.getSignature().getNamespace(), cfs.getSignature().getName()));
+        out.print(
+                this.generateFullName(cfs.getFunctionSignature().getNamespace(), cfs.getFunctionSignature().getName()));
         out.print("(");
         printDelimitedStrings(cfs.getParamList(), COMMA);
         out.println(") {");

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 07d3c69..e4c4860 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -205,7 +205,6 @@ public class DatasetUtil {
         return new Pair<>(mergePolicyFactory, properties);
     }
 
-    @SuppressWarnings("unchecked")
     public static void writePropertyTypeRecord(String name, String value, DataOutput out, ARecordType recordType)
             throws HyracksDataException {
         IARecordBuilder propertyRecordBuilder = new RecordBuilder();
@@ -399,7 +398,7 @@ public class DatasetUtil {
                 metadataProvider.getSplitProviderAndConstraints(dataset);
 
         // prepare callback
-        TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId();
+        TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(null);
         int[] primaryKeyFields = new int[numKeys];
         for (int i = 0; i < numKeys; i++) {
             primaryKeyFields[i] = i;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index cdb0dc0..e59c600 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -864,6 +864,9 @@ public class BuiltinFunctions {
     public static final FunctionIdentifier EXTERNAL_LOOKUP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "external-lookup", FunctionIdentifier.VARARGS);
 
+    public static final FunctionIdentifier GET_JOB_PARAMETER =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "get-job-param", 1);
+
     public static final FunctionIdentifier META = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta",
             FunctionIdentifier.VARARGS);
     public static final FunctionIdentifier META_KEY = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta-key",
@@ -1274,6 +1277,9 @@ public class BuiltinFunctions {
         // external lookup
         addPrivateFunction(EXTERNAL_LOOKUP, AnyTypeComputer.INSTANCE, false);
 
+        // get job parameter
+        addFunction(GET_JOB_PARAMETER, AnyTypeComputer.INSTANCE, false);
+
         // unnesting function
         addPrivateFunction(SCAN_COLLECTION, CollectionMemberResultType.INSTANCE, true);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
index 1f9909c..95b6ef6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
@@ -22,8 +22,8 @@ package org.apache.asterix.runtime.evaluators.functions;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -57,7 +57,6 @@ abstract class AbstractUnaryStringStringEval implements IScalarEvaluator {
         this.funcID = funcID;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void evaluate(IFrameTupleReference tuple, IPointable resultPointable) throws HyracksDataException {
         resultStorage.reset();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
new file mode 100644
index 0000000..17f7a96
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.evaluators.functions;
+
+import java.io.IOException;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+
+public class GetJobParameterByNameDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new GetJobParameterByNameDescriptor();
+        }
+    };
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws HyracksDataException {
+                return new AbstractUnaryStringStringEval(ctx, args[0],
+                        GetJobParameterByNameDescriptor.this.getIdentifier()) {
+                    private byte[] result;
+
+                    @Override
+                    protected void process(UTF8StringPointable inputString, IPointable resultPointable)
+                            throws IOException {
+                        result = ctx.getJobParameter(inputString.getByteArray(), inputString.getStartOffset(),
+                                inputString.getLength());
+                    }
+
+                    @Override
+                    void writeResult(IPointable resultPointable) throws IOException {
+                        resultPointable.set(result, 0, result.length);
+                    }
+                };
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.GET_JOB_PARAMETER;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 5acbeb9..f3eb6c9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -158,6 +158,7 @@ import org.apache.asterix.runtime.evaluators.functions.EditDistanceStringIsFilte
 import org.apache.asterix.runtime.evaluators.functions.FullTextContainsDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.FullTextContainsWithoutOptionDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.GetItemDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.GetJobParameterByNameDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.GramTokensDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.HashedGramTokensDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.HashedWordTokensDescriptor;
@@ -437,6 +438,9 @@ public final class FunctionCollection {
         // Inject failure function
         fc.add(InjectFailureDescriptor.FACTORY);
 
+        // Get Job Parameter function
+        fc.add(GetJobParameterByNameDescriptor.FACTORY);
+
         // Switch case
         fc.add(SwitchCaseDescriptor.FACTORY);
 
@@ -737,8 +741,8 @@ public final class FunctionCollection {
      */
     private static IFunctionDescriptorFactory getGeneratedFunctionDescriptorFactory(Class<?> cl) {
         try {
-            String className = CodeGenHelper.getGeneratedClassName(cl.getName(),
-                    CodeGenHelper.DEFAULT_SUFFIX_FOR_GENERATED_CLASS);
+            String className =
+                    CodeGenHelper.getGeneratedClassName(cl.getName(), CodeGenHelper.DEFAULT_SUFFIX_FOR_GENERATED_CLASS);
             Class<?> generatedCl = cl.getClassLoader().loadClass(className);
             Field factory = generatedCl.getDeclaredField(FACTORY);
             return (IFunctionDescriptorFactory) factory.get(null);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 1422e42..23ad1e1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.runtime.job.listener;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.DatasetId;
@@ -27,14 +28,19 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.job.IJobletEventListener;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.job.JobStatus;
 
-public class JobEventListenerFactory implements IJobletEventListenerFactory {
+public class JobEventListenerFactory implements IJobEventListenerFactory {
 
     private static final long serialVersionUID = 1L;
-    private final TxnId txnId;
+
+    private TxnId txnId;
     private final boolean transactionalWrite;
 
+    //To enable new Asterix TxnId for separate deployed job spec invocations
+    private static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
+
     public JobEventListenerFactory(TxnId txnId, boolean transactionalWrite) {
         this.txnId = txnId;
         this.transactionalWrite = transactionalWrite;
@@ -45,6 +51,26 @@ public class JobEventListenerFactory implements IJobletEventListenerFactory {
     }
 
     @Override
+    public TxnId getTxnId(TxnId compiledTxnId) {
+        return txnId;
+    }
+
+    @Override
+    public IJobletEventListenerFactory copyFactory() {
+        return new JobEventListenerFactory(txnId, transactionalWrite);
+    }
+
+    @Override
+    public void updateListenerJobParameters(JobParameterByteStore jobParameterByteStore) {
+        String AsterixTransactionIdString =
+                new String(jobParameterByteStore.getParameterValue(TRANSACTION_ID_PARAMETER_NAME, 0,
+                        TRANSACTION_ID_PARAMETER_NAME.length));
+        if (AsterixTransactionIdString.length() > 0) {
+            this.txnId = new TxnId(Integer.parseInt(AsterixTransactionIdString));
+        }
+    }
+
+    @Override
     public IJobletEventListener createListener(final IHyracksJobletContext jobletContext) {
 
         return new IJobletEventListener() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
index a63f3ca..23c86f3 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -20,6 +20,7 @@ package org.apache.asterix.runtime.job.listener;
 
 import java.util.List;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.DatasetId;
@@ -29,13 +30,14 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.job.IJobletEventListener;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.job.JobStatus;
 
 /**
  * This Joblet enable transactions on multiple datasets to take place in the same Hyracks Job
  * It takes a list of Transaction job ids instead of a single job Id
  */
-public class MultiTransactionJobletEventListenerFactory implements IJobletEventListenerFactory {
+public class MultiTransactionJobletEventListenerFactory implements IJobEventListenerFactory {
 
     private static final long serialVersionUID = 1L;
     private final List<TxnId> txnIds;
@@ -46,6 +48,22 @@ public class MultiTransactionJobletEventListenerFactory implements IJobletEventL
         this.transactionalWrite = transactionalWrite;
     }
 
+    //TODO: Enable this factory to be usable for Deployed Jobs
+    @Override
+    public TxnId getTxnId(TxnId compiledTxnId) {
+        return compiledTxnId;
+    }
+
+    @Override
+    public IJobletEventListenerFactory copyFactory() {
+        return new MultiTransactionJobletEventListenerFactory(txnIds, transactionalWrite);
+    }
+
+    @Override
+    public void updateListenerJobParameters(JobParameterByteStore jobParameterByteStore) {
+        //no op
+    }
+
     @Override
     public IJobletEventListener createListener(final IHyracksJobletContext jobletContext) {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
index 73b9b41..3f3dbd9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
@@ -28,6 +29,7 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 
 public class LockThenSearchOperationCallbackFactory extends AbstractOperationCallbackFactory
@@ -45,7 +47,9 @@ public class LockThenSearchOperationCallbackFactory extends AbstractOperationCal
             IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             return new LockThenSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnSubsystem, txnCtx,
                     operatorNodePushable);
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index dbf58e4..93108f9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
@@ -29,6 +30,7 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
@@ -47,7 +49,9 @@ public class PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractO
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             return new PrimaryIndexInstantSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index b8c6084..fb01952 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IIndex;
@@ -66,7 +68,9 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index 49490a1..076b0d9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
@@ -29,6 +30,7 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
@@ -47,7 +49,9 @@ public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperatio
             IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             return new PrimaryIndexSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 1847e80..5882046 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -62,7 +64,9 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
             IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 38adf2b..79ce788 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -63,7 +65,9 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
             IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 5e499fc..8a27914 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IIndex;
@@ -65,7 +67,9 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, indexOp);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index be75ab9..dfd3eb1 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -31,6 +32,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -62,7 +64,9 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId),
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
                     aResource.getPartition(), resourceType, indexOp);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 2e43957..58f7e69 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -19,11 +19,13 @@
 
 package org.apache.asterix.transaction.management.runtime;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 
 public class CommitRuntimeFactory implements IPushRuntimeFactory {
 
@@ -55,7 +57,9 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
 
     @Override
     public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
-            return new CommitRuntime(ctx, txnId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
-                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
+        IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+        return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(txnId), datasetId,
+                primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction,
+                datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 95479c1..23c41fe 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -22,10 +22,13 @@ import java.io.Serializable;
 import java.net.URL;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 
@@ -104,12 +107,12 @@ public class HyracksClientInterfaceFunctions {
         }
     }
 
-    public static class DistributeJobFunction extends Function {
+    public static class DeployJobSpecFunction extends Function {
         private static final long serialVersionUID = 1L;
 
         private final byte[] acggfBytes;
 
-        public DistributeJobFunction(byte[] acggfBytes) {
+        public DeployJobSpecFunction(byte[] acggfBytes) {
             this.acggfBytes = acggfBytes;
         }
 
@@ -145,13 +148,13 @@ public class HyracksClientInterfaceFunctions {
         }
     }
 
-    public static class DestroyJobFunction extends Function {
+    public static class UndeployJobSpecFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final DeployedJobSpecId deployedJobSpecId;
 
-        public DestroyJobFunction(JobId jobId) {
-            this.jobId = jobId;
+        public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) {
+            this.deployedJobSpecId = deployedJobSpecId;
         }
 
         @Override
@@ -159,8 +162,8 @@ public class HyracksClientInterfaceFunctions {
             return FunctionId.DESTROY_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
         }
     }
 
@@ -168,27 +171,30 @@ public class HyracksClientInterfaceFunctions {
         private static final long serialVersionUID = 1L;
 
         private final byte[] acggfBytes;
-        private final EnumSet<JobFlag> jobFlags;
+        private final Set<JobFlag> jobFlags;
         private final DeploymentId deploymentId;
-        private final JobId jobId;
+        private final DeployedJobSpecId deployedJobSpecId;
+        private final Map<byte[], byte[]> jobParameters;
 
-        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) {
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, Set<JobFlag> jobFlags,
+                DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) {
             this.acggfBytes = acggfBytes;
             this.jobFlags = jobFlags;
             this.deploymentId = deploymentId;
-            this.jobId = jobId;
+            this.deployedJobSpecId = deployedJobSpecId;
+            this.jobParameters = jobParameters;
         }
 
-        public StartJobFunction(JobId jobId) {
-            this(null, null, EnumSet.noneOf(JobFlag.class), jobId);
+        public StartJobFunction(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) {
+            this(null, null, EnumSet.noneOf(JobFlag.class), deployedJobSpecId, jobParameters);
         }
 
-        public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
-            this(null, acggfBytes, jobFlags, null);
+        public StartJobFunction(byte[] acggfBytes, Set<JobFlag> jobFlags) {
+            this(null, acggfBytes, jobFlags, null, null);
         }
 
-        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
-            this(deploymentId, acggfBytes, jobFlags, null);
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, Set<JobFlag> jobFlags) {
+            this(deploymentId, acggfBytes, jobFlags, null, null);
         }
 
         @Override
@@ -196,15 +202,19 @@ public class HyracksClientInterfaceFunctions {
             return FunctionId.START_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public Map<byte[], byte[]> getJobParameters() {
+            return jobParameters;
+        }
+
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
         }
 
         public byte[] getACGGFBytes() {
             return acggfBytes;
         }
 
-        public EnumSet<JobFlag> getJobFlags() {
+        public Set<JobFlag> getJobFlags() {
             return jobFlags;
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 0ded84f..eddcaa5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobInfo;
@@ -76,9 +77,9 @@ public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterfac
     }
 
     @Override
-    public JobId startJob(JobId jobId) throws Exception {
+    public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception {
         HyracksClientInterfaceFunctions.StartJobFunction sjf =
-                new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
+                new HyracksClientInterfaceFunctions.StartJobFunction(deployedJobSpecId, jobParameters);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
@@ -90,17 +91,17 @@ public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterfac
     }
 
     @Override
-    public JobId distributeJob(byte[] acggfBytes) throws Exception {
-        HyracksClientInterfaceFunctions.DistributeJobFunction sjf =
-                new HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes);
-        return (JobId) rpci.call(ipcHandle, sjf);
+    public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception {
+        HyracksClientInterfaceFunctions.DeployJobSpecFunction sjf =
+                new HyracksClientInterfaceFunctions.DeployJobSpecFunction(acggfBytes);
+        return (DeployedJobSpecId) rpci.call(ipcHandle, sjf);
     }
 
     @Override
-    public JobId destroyJob(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.DestroyJobFunction sjf =
-                new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId);
-        return (JobId) rpci.call(ipcHandle, sjf);
+    public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
+        HyracksClientInterfaceFunctions.UndeployJobSpecFunction sjf =
+                new HyracksClientInterfaceFunctions.UndeployJobSpecFunction(deployedJobSpecId);
+        return (DeployedJobSpecId) rpci.call(ipcHandle, sjf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index e979da6..85ef927 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -36,6 +36,7 @@ import org.apache.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGe
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -109,20 +110,20 @@ public final class HyracksConnection implements IHyracksClientConnection {
     }
 
     @Override
-    public JobId distributeJob(JobSpecification jobSpec) throws Exception {
-        IActivityClusterGraphGeneratorFactory jsacggf =
+    public DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception {
+        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
                 new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
-        return distributeJob(jsacggf);
+        return deployJobSpec(jsacggf);
     }
 
     @Override
-    public JobId destroyJob(JobId jobId) throws Exception {
-        return hci.destroyJob(jobId);
+    public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
+        return hci.undeployJobSpec(deployedJobSpecId);
     }
 
     @Override
-    public JobId startJob(JobId jobId) throws Exception {
-        return hci.startJob(jobId);
+    public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception {
+        return hci.startJob(deployedJobSpecId, jobParameters);
     }
 
     @Override
@@ -130,8 +131,8 @@ public final class HyracksConnection implements IHyracksClientConnection {
         return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 
-    public JobId distributeJob(IActivityClusterGraphGeneratorFactory acggf) throws Exception {
-        return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
+    public DeployedJobSpecId deployJobSpec(IActivityClusterGraphGeneratorFactory acggf) throws Exception {
+        return hci.deployJobSpec(JavaSerializationUtils.serialize(acggf));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index a7c1d75..510a6b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -20,9 +20,11 @@ package org.apache.hyracks.api.client;
 
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -94,25 +96,27 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      *            Flags
      * @throws Exception
      */
-    JobId distributeJob(JobSpecification jobSpec) throws Exception;
+    DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception;
 
     /**
-     * Destroy the distributed graph for a pre-distributed job
+     * Remove the deployed Job Spec
      *
-     * @param jobId
-     *            The id of the predistributed job
+     * @param deployedJobSpecId
+     *            The id of the deployed job spec
      * @throws Exception
      */
-    JobId destroyJob(JobId jobId) throws Exception;
+    DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
 
     /**
-     * Used to run a pre-distributed job by id (the same JobId will be returned)
+     * Used to run a deployed Job Spec by id
      *
-     * @param jobId
-     *            The id of the predistributed job
+     * @param deployedJobSpecId
+     *            The id of the deployed job spec
+     * @param jobParameters
+     *            The serialized job parameters
      * @throws Exception
      */
-    JobId startJob(JobId jobId) throws Exception;
+    JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception;
 
     /**
      * Start the specified Job.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 9cebd3e..f0c7872 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobInfo;
@@ -38,13 +39,13 @@ public interface IHyracksClientInterface {
 
     public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
 
-    public JobId startJob(JobId jobId) throws Exception;
+    public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception;
 
     public void cancelJob(JobId jobId) throws Exception;
 
-    public JobId distributeJob(byte[] acggfBytes) throws Exception;
+    public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception;
 
-    public JobId destroyJob(JobId jobId) throws Exception;
+    public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
 
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
index 7dd5fe9..0b2cc9b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
@@ -28,7 +28,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
@@ -36,7 +35,6 @@ import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.ActivityClusterId;
 import org.apache.hyracks.api.job.JobActivityGraph;
-import org.apache.hyracks.api.job.JobId;
 
 public class ActivityClusterGraphBuilder {
     private static final Logger LOGGER = Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
@@ -70,7 +68,7 @@ public class ActivityClusterGraphBuilder {
         return null;
     }
 
-    public ActivityClusterGraph inferActivityClusters(JobId jobId, JobActivityGraph jag) {
+    public ActivityClusterGraph inferActivityClusters(JobActivityGraph jag) {
         /*
          * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
          */
@@ -99,7 +97,7 @@ public class ActivityClusterGraphBuilder {
         Map<ActivityId, IActivity> activityNodeMap = jag.getActivityMap();
         List<ActivityCluster> acList = new ArrayList<ActivityCluster>();
         for (Set<ActivityId> stage : stages) {
-            ActivityCluster ac = new ActivityCluster(acg, new ActivityClusterId(jobId, acCounter++));
+            ActivityCluster ac = new ActivityCluster(acg, new ActivityClusterId(acCounter++));
             acList.add(ac);
             for (ActivityId aid : stage) {
                 IActivity activity = activityNodeMap.get(aid);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index c712b36..ddf0ce8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -32,7 +32,6 @@ import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobActivityGraph;
 import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.rewriter.ActivityClusterGraphRewriter;
 
@@ -51,8 +50,8 @@ public class JobSpecificationActivityClusterGraphGeneratorFactory implements IAc
     }
 
     @Override
-    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
-            final ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) throws HyracksException {
+    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(final ICCServiceContext ccServiceCtx,
+            Set<JobFlag> jobFlags) throws HyracksException {
         final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(spec, jobFlags);
         PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
             @Override
@@ -70,7 +69,7 @@ public class JobSpecificationActivityClusterGraphGeneratorFactory implements IAc
         final JobActivityGraph jag = builder.getActivityGraph();
         ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder();
 
-        final ActivityClusterGraph acg = acgb.inferActivityClusters(jobId, jag);
+        final ActivityClusterGraph acg = acgb.inferActivityClusters(jag);
         acg.setFrameSize(spec.getFrameSize());
         acg.setMaxReattempts(spec.getMaxReattempts());
         acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 6eb0adb..1100335 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.api.context;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatableRegistry;
@@ -34,6 +35,8 @@ public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocat
 
     Object getGlobalJobData();
 
+    IJobletEventListenerFactory getJobletEventListenerFactory();
+
     Class<?> loadClass(String className) throws HyracksException;
 
     ClassLoader getClassLoader() throws HyracksException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index 10bb336..bf42d0c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.IOperatorEnvironment;
 import org.apache.hyracks.api.job.JobFlag;
@@ -52,6 +53,8 @@ public interface IHyracksTaskContext
 
     Object getSharedObject();
 
+    public byte[] getJobParameter(byte[] name, int start, int length) throws HyracksException;
+
     Set<JobFlag> getJobFlags();
 
     IStatsCollector getStatsCollector();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7926469..eaf9bbf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -54,9 +54,9 @@ public class ErrorCode {
     public static final int INCONSISTENT_RESULT_METADATA = 18;
     public static final int CANNOT_DELETE_FILE = 19;
     public static final int NOT_A_JOBID = 20;
-    public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21;
-    public static final int DUPLICATE_DISTRIBUTED_JOB = 22;
-    public static final int DISTRIBUTED_JOB_FAILURE = 23;
+    public static final int ERROR_FINDING_DEPLOYED_JOB = 21;
+    public static final int DUPLICATE_DEPLOYED_JOB = 22;
+    public static final int DEPLOYED_JOB_FAILURE = 23;
     public static final int NO_RESULT_SET = 24;
     public static final int JOB_CANCELED = 25;
     public static final int NODE_FAILED = 26;


Mime
View raw message