asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjaco...@apache.org
Subject [2/3] asterixdb git commit: [ASTERIXDB-1911][HYR, RT, CLUS] Fixes and Improvements for Deployed Jobs
Date Wed, 15 Nov 2017 01:03:05 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
index e0c5279..84a754a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
@@ -23,19 +23,12 @@ import java.io.Serializable;
 public final class ActivityClusterId implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final JobId jobId;
-
     private final int id;
 
-    public ActivityClusterId(JobId jobId, int id) {
-        this.jobId = jobId;
+    public ActivityClusterId(int id) {
         this.id = id;
     }
 
-    public JobId getJobId() {
-        return jobId;
-    }
-
     public int getId() {
         return id;
     }
@@ -45,7 +38,6 @@ public final class ActivityClusterId implements Serializable {
         final int prime = 31;
         int result = 1;
         result = prime * result + id;
-        result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
         return result;
     }
 
@@ -64,18 +56,11 @@ public final class ActivityClusterId implements Serializable {
         if (id != other.id) {
             return false;
         }
-        if (jobId == null) {
-            if (other.jobId != null) {
-                return false;
-            }
-        } else if (!jobId.equals(other.jobId)) {
-            return false;
-        }
         return true;
     }
 
     @Override
     public String toString() {
-        return "ACID:" + jobId + ":" + id;
+        return "ACID:" + id;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java
new file mode 100644
index 0000000..8cbfb1a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IWritable;
+
+public final class DeployedJobSpecId implements IWritable, Serializable {
+
+    public static final DeployedJobSpecId INVALID = new DeployedJobSpecId(-1l);
+
+    private static final long serialVersionUID = 1L;
+    private long id;
+
+    public static DeployedJobSpecId create(DataInput dis) throws IOException {
+        DeployedJobSpecId deployedJobSpecId = new DeployedJobSpecId();
+        deployedJobSpecId.readFields(dis);
+        return deployedJobSpecId;
+    }
+
+    private DeployedJobSpecId() {
+    }
+
+    public DeployedJobSpecId(long id) {
+        this.id = id;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) id;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof DeployedJobSpecId)) {
+            return false;
+        }
+        return ((DeployedJobSpecId) o).id == id;
+    }
+
+    @Override
+    public String toString() {
+        return "PDJID:" + id;
+    }
+
+    public static DeployedJobSpecId parse(String str) throws HyracksDataException {
+        if (str.startsWith("PDJID:")) {
+            return new DeployedJobSpecId(Long.parseLong(str.substring(4)));
+        }
+        throw HyracksDataException.create(ErrorCode.NOT_A_JOBID, str);
+    }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeLong(id);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        id = input.readLong();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java
new file mode 100644
index 0000000..24caa9b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class DeployedJobSpecIdFactory {
+    private final AtomicLong id = new AtomicLong(0);
+
+    public DeployedJobSpecId create() {
+        return new DeployedJobSpecId(id.getAndIncrement());
+    }
+
+    public long maxDeployedJobSpecId() {
+        return id.get();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
index 133e342..d23b944 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -25,7 +25,7 @@ import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
 
 public interface IActivityClusterGraphGeneratorFactory extends Serializable {
-    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
+    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(
             ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) throws HyracksException;
 
     public JobSpecification getJobSpecification();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java
index d523ccc..bd2f189 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java
@@ -23,5 +23,10 @@ import java.io.Serializable;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 
 public interface IJobletEventListenerFactory extends Serializable {
-    public IJobletEventListener createListener(IHyracksJobletContext ctx);
+    IJobletEventListener createListener(IHyracksJobletContext ctx);
+
+    IJobletEventListenerFactory copyFactory();
+
+    //Allows job parameters to change listener settings
+    void updateListenerJobParameters(JobParameterByteStore jobParameterByteStore);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
new file mode 100644
index 0000000..551b3d7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class JobParameterByteStore implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private Map<byte[], byte[]> runtimeValues;
+    private final byte[] empty = new byte[0];
+
+    public JobParameterByteStore() {
+        runtimeValues = new HashMap<>();
+    }
+
+    public Map<byte[], byte[]> getParameterMap() {
+        return runtimeValues;
+    }
+
+    public void setParameters(Map<byte[], byte[]> map) {
+        runtimeValues = map;
+    }
+
+    public byte[] getParameterValue(byte[] name, int start, int length) {
+        for (Entry<byte[], byte[]> entry : runtimeValues.entrySet()) {
+            byte[] key = entry.getKey();
+            if (key.length == length) {
+                boolean matched = true;
+                for (int j = 0; j < length; j++) {
+                    if (key[j] != name[j + start]) {
+                        matched = false;
+                        break;
+                    }
+                }
+                if (matched) {
+                    return entry.getValue();
+                }
+            }
+        }
+        return empty;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 327c422..4e3c0f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -24,15 +24,15 @@ import java.util.logging.Logger;
 import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.DeployedJobSpecIdFactory;
 import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.job.JobInfo;
 import org.apache.hyracks.control.cc.work.CancelJobWork;
 import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
 import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
 import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
-import org.apache.hyracks.control.cc.work.DestroyJobWork;
-import org.apache.hyracks.control.cc.work.DistributeJobWork;
+import org.apache.hyracks.control.cc.work.DeployJobSpecWork;
 import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import org.apache.hyracks.control.cc.work.GetJobInfoWork;
 import org.apache.hyracks.control.cc.work.GetJobStatusWork;
@@ -42,6 +42,7 @@ import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
 import org.apache.hyracks.control.cc.work.GetResultStatusWork;
 import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
 import org.apache.hyracks.control.cc.work.JobStartWork;
+import org.apache.hyracks.control.cc.work.UndeployJobSpecWork;
 import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
 import org.apache.hyracks.control.common.work.IPCResponder;
 import org.apache.hyracks.ipc.api.IIPCHandle;
@@ -53,10 +54,12 @@ class ClientInterfaceIPCI implements IIPCI {
     private static final Logger LOGGER = Logger.getLogger(ClientInterfaceIPCI.class.getName());
     private final ClusterControllerService ccs;
     private final JobIdFactory jobIdFactory;
+    private final DeployedJobSpecIdFactory deployedJobSpecIdFactory;
 
-    ClientInterfaceIPCI(ClusterControllerService ccs, JobIdFactory jobIdFactory) {
+    ClientInterfaceIPCI(final ClusterControllerService ccs, final JobIdFactory jobIdFactory) {
         this.ccs = ccs;
         this.jobIdFactory = jobIdFactory;
+        this.deployedJobSpecIdFactory = ccs.getDeployedJobSpecIdFactory();
     }
 
     @Override
@@ -83,16 +86,17 @@ class ClientInterfaceIPCI implements IIPCI {
                         new IPCResponder<JobInfo>(handle, mid)));
                 break;
             case DISTRIBUTE_JOB:
-                HyracksClientInterfaceFunctions.DistributeJobFunction djf =
-                        (HyracksClientInterfaceFunctions.DistributeJobFunction) fn;
-                ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory,
-                        new IPCResponder<JobId>(handle, mid)));
+                HyracksClientInterfaceFunctions.DeployJobSpecFunction djf =
+                        (HyracksClientInterfaceFunctions.DeployJobSpecFunction) fn;
+                ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, djf.getACGGFBytes(),
+                        deployedJobSpecIdFactory.create(), new IPCResponder<>(handle, mid)));
                 break;
             case DESTROY_JOB:
-                HyracksClientInterfaceFunctions.DestroyJobFunction dsjf =
-                        (HyracksClientInterfaceFunctions.DestroyJobFunction) fn;
+                HyracksClientInterfaceFunctions.UndeployJobSpecFunction dsjf =
+                        (HyracksClientInterfaceFunctions.UndeployJobSpecFunction) fn;
                 ccs.getWorkQueue()
-                        .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new IPCResponder<JobId>(handle, mid)));
+                        .schedule(new UndeployJobSpecWork(ccs, dsjf.getDeployedJobSpecId(),
+                                new IPCResponder<>(handle, mid)));
                 break;
             case CANCEL_JOB:
                 HyracksClientInterfaceFunctions.CancelJobFunction cjf =
@@ -103,8 +107,14 @@ class ClientInterfaceIPCI implements IIPCI {
             case START_JOB:
                 HyracksClientInterfaceFunctions.StartJobFunction sjf =
                         (HyracksClientInterfaceFunctions.StartJobFunction) fn;
-                ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), sjf.getACGGFBytes(),
-                        sjf.getJobFlags(), sjf.getJobId(), new IPCResponder<JobId>(handle, mid), jobIdFactory));
+                DeployedJobSpecId id = sjf.getDeployedJobSpecId();
+                byte[] acggfBytes = null;
+                if (id == null) {
+                    //The job is new
+                    acggfBytes = sjf.getACGGFBytes();
+                }
+                ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
+                        jobIdFactory, sjf.getJobParameters(), new IPCResponder<>(handle, mid), id));
                 break;
             case GET_DATASET_DIRECTORY_SERIVICE_INFO:
                 ccs.getWorkQueue().schedule(

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index af5c102..5a53fce 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -23,7 +23,7 @@ import java.util.logging.Logger;
 
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
-import org.apache.hyracks.control.cc.work.DistributedJobFailureWork;
+import org.apache.hyracks.control.cc.work.DeployedJobFailureWork;
 import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
 import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
 import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
@@ -76,13 +76,12 @@ class ClusterControllerIPCI implements IIPCI {
                 break;
             case NOTIFY_JOBLET_CLEANUP:
                 CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
-                ccs.getWorkQueue().schedule(new JobletCleanupNotificationWork(ccs, njcf.getJobId(),
-                        njcf.getNodeId()));
+                ccs.getWorkQueue().schedule(new JobletCleanupNotificationWork(ccs, njcf.getJobId(), njcf.getNodeId()));
                 break;
             case NOTIFY_DEPLOY_BINARY:
                 CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
-                ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs, ndbf.getDeploymentId(),
-                        ndbf.getNodeId(), ndbf.getDeploymentStatus()));
+                ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs, ndbf.getDeploymentId(), ndbf.getNodeId(),
+                        ndbf.getDeploymentStatus()));
                 break;
             case REPORT_PROFILE:
                 CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
@@ -90,49 +89,48 @@ class ClusterControllerIPCI implements IIPCI {
                 break;
             case NOTIFY_TASK_COMPLETE:
                 CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
-                ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs, ntcf.getJobId(),
-                        ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
+                ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs, ntcf.getJobId(), ntcf.getTaskId(),
+                        ntcf.getNodeId(), ntcf.getStatistics()));
                 break;
             case NOTIFY_TASK_FAILURE:
                 CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
-                ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(),
-                        ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
+                ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(), ntff.getTaskId(),
+                        ntff.getNodeId(), ntff.getExceptions()));
                 break;
-            case DISTRIBUTED_JOB_FAILURE:
-                CCNCFunctions.ReportDistributedJobFailureFunction rdjf =
-                        (CCNCFunctions.ReportDistributedJobFailureFunction) fn;
-                ccs.getWorkQueue().schedule(new DistributedJobFailureWork(rdjf.getJobId(), rdjf.getNodeId()));
+            case DEPLOYED_JOB_FAILURE:
+                CCNCFunctions.ReportDeployedJobSpecFailureFunction rdjf =
+                        (CCNCFunctions.ReportDeployedJobSpecFailureFunction) fn;
+                ccs.getWorkQueue()
+                        .schedule(new DeployedJobFailureWork(rdjf.getDeployedJobSpecId(), rdjf.getNodeId()));
                 break;
             case REGISTER_PARTITION_PROVIDER:
                 CCNCFunctions.RegisterPartitionProviderFunction rppf =
                         (CCNCFunctions.RegisterPartitionProviderFunction) fn;
-                ccs.getWorkQueue().schedule(new RegisterPartitionAvailibilityWork(ccs,
-                        rppf.getPartitionDescriptor()));
+                ccs.getWorkQueue().schedule(new RegisterPartitionAvailibilityWork(ccs, rppf.getPartitionDescriptor()));
                 break;
             case REGISTER_PARTITION_REQUEST:
                 CCNCFunctions.RegisterPartitionRequestFunction rprf =
                         (CCNCFunctions.RegisterPartitionRequestFunction) fn;
-                ccs.getWorkQueue().schedule(new RegisterPartitionRequestWork(ccs,
-                        rprf.getPartitionRequest()));
+                ccs.getWorkQueue().schedule(new RegisterPartitionRequestWork(ccs, rprf.getPartitionRequest()));
                 break;
             case REGISTER_RESULT_PARTITION_LOCATION:
                 CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
                         (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
-                ccs.getWorkQueue().schedule(new RegisterResultPartitionLocationWork(ccs,
-                        rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
-                        rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
+                ccs.getWorkQueue()
+                        .schedule(new RegisterResultPartitionLocationWork(ccs, rrplf.getJobId(), rrplf.getResultSetId(),
+                                rrplf.getOrderedResult(), rrplf.getEmptyResult(), rrplf.getPartition(),
+                                rrplf.getNPartitions(), rrplf.getNetworkAddress()));
                 break;
             case REPORT_RESULT_PARTITION_WRITE_COMPLETION:
                 CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrpwc =
                         (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
-                ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs,
-                        rrpwc.getJobId(), rrpwc.getResultSetId(), rrpwc.getPartition()));
+                ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs, rrpwc.getJobId(),
+                        rrpwc.getResultSetId(), rrpwc.getPartition()));
                 break;
             case SEND_APPLICATION_MESSAGE:
-                CCNCFunctions.SendApplicationMessageFunction rsf =
-                        (CCNCFunctions.SendApplicationMessageFunction) fn;
-                ccs.getWorkQueue().schedule(new ApplicationMessageWork(ccs, rsf.getMessage(),
-                        rsf.getDeploymentId(), rsf.getNodeId()));
+                CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+                ccs.getWorkQueue().schedule(
+                        new ApplicationMessageWork(ccs, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId()));
                 break;
             case GET_NODE_CONTROLLERS_INFO:
                 ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs.getNodeManager(),
@@ -150,18 +148,17 @@ class ClusterControllerIPCI implements IIPCI {
                 break;
             case STATE_DUMP_RESPONSE:
                 CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn;
-                ccs.getWorkQueue().schedule(new NotifyStateDumpResponse(ccs, dsrf.getNodeId(),
-                        dsrf.getStateDumpId(), dsrf.getState()));
+                ccs.getWorkQueue().schedule(
+                        new NotifyStateDumpResponse(ccs, dsrf.getNodeId(), dsrf.getStateDumpId(), dsrf.getState()));
                 break;
             case SHUTDOWN_RESPONSE:
                 CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn;
                 ccs.getWorkQueue().schedule(new NotifyShutdownWork(ccs, sdrf.getNodeId()));
                 break;
             case THREAD_DUMP_RESPONSE:
-                CCNCFunctions.ThreadDumpResponseFunction tdrf =
-                        (CCNCFunctions.ThreadDumpResponseFunction)fn;
-                ccs.getWorkQueue().schedule(new NotifyThreadDumpResponse(ccs,
-                        tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
+                CCNCFunctions.ThreadDumpResponseFunction tdrf = (CCNCFunctions.ThreadDumpResponseFunction) fn;
+                ccs.getWorkQueue()
+                        .schedule(new NotifyThreadDumpResponse(ccs, tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
                 break;
             default:
                 LOGGER.warning("Unknown function: " + fn.getFunctionId());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 7b99df2..713bddd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -50,7 +50,10 @@ import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.DeployedJobSpecIdFactory;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobIdFactory;
+import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
@@ -107,7 +110,9 @@ public class ClusterControllerService implements IControllerService {
 
     private CCServiceContext serviceCtx;
 
-    private final PreDistributedJobStore preDistributedJobStore = new PreDistributedJobStore();
+    private final DeployedJobSpecStore deployedJobSpecStore = new DeployedJobSpecStore();
+
+    private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap = new HashMap<>();
 
     private final WorkQueue workQueue;
 
@@ -135,6 +140,8 @@ public class ClusterControllerService implements IControllerService {
 
     private final JobIdFactory jobIdFactory;
 
+    private final DeployedJobSpecIdFactory deployedJobSpecIdFactory;
+
     private IJobManager jobManager;
 
     private ShutdownRun shutdownCallback;
@@ -164,8 +171,8 @@ public class ClusterControllerService implements IControllerService {
         final ClusterTopology topology = computeClusterTopology(ccConfig);
         ccContext = new ClusterControllerContext(topology);
         sweeper = new DeadNodeSweeper();
-        datasetDirectoryService = new DatasetDirectoryService(ccConfig.getResultTTL(),
-                ccConfig.getResultSweepThreshold(), preDistributedJobStore);
+        datasetDirectoryService =
+                new DatasetDirectoryService(ccConfig.getResultTTL(), ccConfig.getResultSweepThreshold());
 
         deploymentRunMap = new HashMap<>();
         stateDumpRunMap = new HashMap<>();
@@ -175,6 +182,8 @@ public class ClusterControllerService implements IControllerService {
         nodeManager = new NodeManager(this, ccConfig, resourceManager);
 
         jobIdFactory = new JobIdFactory();
+
+        deployedJobSpecIdFactory = new DeployedJobSpecIdFactory();
     }
 
     private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
@@ -347,8 +356,21 @@ public class ClusterControllerService implements IControllerService {
         return nodeManager;
     }
 
-    public PreDistributedJobStore getPreDistributedJobStore() throws HyracksException {
-        return preDistributedJobStore;
+    public DeployedJobSpecStore getDeployedJobSpecStore() throws HyracksException {
+        return deployedJobSpecStore;
+    }
+
+    public void removeJobParameterByteStore(JobId jobId) throws HyracksException {
+        jobParameterByteStoreMap.remove(jobId);
+    }
+
+    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException {
+        JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
+        if (jpbs == null) {
+            jpbs = new JobParameterByteStore();
+            jobParameterByteStoreMap.put(jobId, jpbs);
+        }
+        return jpbs;
     }
 
     public IResourceManager getResourceManager() {
@@ -397,6 +419,10 @@ public class ClusterControllerService implements IControllerService {
         return jobIdFactory;
     }
 
+    public DeployedJobSpecIdFactory getDeployedJobSpecIdFactory() {
+        return deployedJobSpecIdFactory;
+    }
+
     private final class ClusterControllerContext implements ICCContext {
         private final ClusterTopology topology;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
new file mode 100644
index 0000000..1a3051e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class DeployedJobSpecStore {
+
+    private final Map<DeployedJobSpecId, DeployedJobSpecDescriptor> deployedJobSpecDescriptorMap;
+
+    public DeployedJobSpecStore() {
+        deployedJobSpecDescriptorMap = new Hashtable<>();
+    }
+
+    public void addDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId,
+            ActivityClusterGraph activityClusterGraph,
+            JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints)
+                    throws HyracksException {
+        if (deployedJobSpecDescriptorMap.get(deployedJobSpecId) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId);
+        }
+        DeployedJobSpecDescriptor descriptor =
+                new DeployedJobSpecDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints);
+        deployedJobSpecDescriptorMap.put(deployedJobSpecId, descriptor);
+    }
+
+    public void checkForExistingDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId) throws HyracksException {
+        if (deployedJobSpecDescriptorMap.get(deployedJobSpecId) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId);
+        }
+    }
+
+    public DeployedJobSpecDescriptor getDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId)
+            throws HyracksException {
+        DeployedJobSpecDescriptor descriptor = deployedJobSpecDescriptorMap.get(deployedJobSpecId);
+        if (descriptor == null) {
+            throw HyracksException.create(ErrorCode.ERROR_FINDING_DEPLOYED_JOB, deployedJobSpecId);
+        }
+        return descriptor;
+    }
+
+    public void removeDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId) throws HyracksException {
+        DeployedJobSpecDescriptor descriptor = deployedJobSpecDescriptorMap.get(deployedJobSpecId);
+        if (descriptor == null) {
+            throw HyracksException.create(ErrorCode.ERROR_FINDING_DEPLOYED_JOB, deployedJobSpecId);
+        }
+        deployedJobSpecDescriptorMap.remove(deployedJobSpecId);
+    }
+
+    public class DeployedJobSpecDescriptor {
+
+        private final ActivityClusterGraph activityClusterGraph;
+
+        private final JobSpecification jobSpecification;
+
+        private final Set<Constraint> activityClusterGraphConstraints;
+
+        private DeployedJobSpecDescriptor(ActivityClusterGraph activityClusterGraph,
+                JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) {
+            this.activityClusterGraph = activityClusterGraph;
+            this.jobSpecification = jobSpecification;
+            this.activityClusterGraphConstraints = activityClusterGraphConstraints;
+        }
+
+        public ActivityClusterGraph getActivityClusterGraph() {
+            return activityClusterGraph;
+        }
+
+        public JobSpecification getJobSpecification() {
+            return jobSpecification;
+        }
+
+        public Set<Constraint> getActivityClusterGraphConstraints() {
+            return activityClusterGraphConstraints;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
deleted file mode 100644
index 117621f..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc;
-
-import java.util.Hashtable;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.api.constraints.Constraint;
-import org.apache.hyracks.api.exceptions.ErrorCode;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class PreDistributedJobStore {
-
-    private final Map<JobId, PreDistributedJobDescriptor> preDistributedJobDescriptorMap;
-
-    public PreDistributedJobStore() {
-        preDistributedJobDescriptorMap = new Hashtable<>();
-    }
-
-    public void addDistributedJobDescriptor(JobId jobId, ActivityClusterGraph activityClusterGraph,
-            JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints)
-                    throws HyracksException {
-        if (preDistributedJobDescriptorMap.get(jobId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
-        }
-        PreDistributedJobDescriptor descriptor =
-                new PreDistributedJobDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints);
-        preDistributedJobDescriptorMap.put(jobId, descriptor);
-    }
-
-    public void checkForExistingDistributedJobDescriptor(JobId jobId) throws HyracksException {
-        if (preDistributedJobDescriptorMap.get(jobId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
-        }
-    }
-
-    public PreDistributedJobDescriptor getDistributedJobDescriptor(JobId jobId) throws HyracksException {
-        PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId);
-        if (descriptor == null) {
-            throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
-        }
-        return descriptor;
-    }
-
-    public boolean jobIsPredistributed(JobId jobId) {
-        return preDistributedJobDescriptorMap.get(jobId) != null;
-    }
-
-    public void removeDistributedJobDescriptor(JobId jobId) throws HyracksException {
-        PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId);
-        if (descriptor == null) {
-            throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
-        }
-        preDistributedJobDescriptorMap.remove(jobId);
-    }
-
-    public class PreDistributedJobDescriptor {
-
-        private final ActivityClusterGraph activityClusterGraph;
-
-        private final JobSpecification jobSpecification;
-
-        private final Set<Constraint> activityClusterGraphConstraints;
-
-        private PreDistributedJobDescriptor(ActivityClusterGraph activityClusterGraph,
-                JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) {
-            this.activityClusterGraph = activityClusterGraph;
-            this.jobSpecification = jobSpecification;
-            this.activityClusterGraphConstraints = activityClusterGraphConstraints;
-        }
-
-        public ActivityClusterGraph getActivityClusterGraph() {
-            return activityClusterGraph;
-        }
-
-        public JobSpecification getJobSpecification() {
-            return jobSpecification;
-        }
-
-        public Set<Constraint> getActivityClusterGraphConstraints() {
-            return activityClusterGraphConstraints;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index ca1c91b..1cb07d0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -42,7 +42,6 @@ import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.control.cc.PreDistributedJobStore;
 import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
 import org.apache.hyracks.control.common.work.IResultCallback;
 
@@ -63,14 +62,10 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
 
     private final Map<JobId, JobResultInfo> jobResultLocations;
 
-    private final PreDistributedJobStore preDistributedJobStore;
-
-    public DatasetDirectoryService(long resultTTL, long resultSweepThreshold,
-            PreDistributedJobStore preDistributedJobStore) {
+    public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
-        this.preDistributedJobStore = preDistributedJobStore;
-        jobResultLocations = new LinkedHashMap<>();
+        jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
     }
 
     @Override
@@ -186,9 +181,6 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
 
     @Override
     public synchronized long getResultTimestamp(JobId jobId) {
-        if (preDistributedJobStore.jobIsPredistributed(jobId)) {
-            return -1;
-        }
         return getState(jobId).getTimestamp();
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 8a69a6f..0b69024 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -47,6 +47,7 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.PartitionId;
@@ -77,7 +78,7 @@ public class JobExecutor {
 
     private final PartitionConstraintSolver solver;
 
-    private final boolean predistributed;
+    private final DeployedJobSpecId deployedJobSpecId;
 
     private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
 
@@ -88,10 +89,10 @@ public class JobExecutor {
     private boolean cancelled = false;
 
     public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints,
-            boolean predistributed) {
+            DeployedJobSpecId deployedJobSpecId) {
         this.ccs = ccs;
         this.jobRun = jobRun;
-        this.predistributed = predistributed;
+        this.deployedJobSpecId = deployedJobSpecId;
         solver = new PartitionConstraintSolver();
         partitionProducingTaskClusterMap = new HashMap<>();
         inProgressTaskClusters = new HashSet<>();
@@ -99,8 +100,8 @@ public class JobExecutor {
         random = new Random();
     }
 
-    public boolean isPredistributed() {
-        return predistributed;
+    public boolean isDeployed() {
+        return deployedJobSpecId != null;
     }
 
     public JobRun getJobRun() {
@@ -502,7 +503,7 @@ public class JobExecutor {
                 new HashMap<>(jobRun.getConnectorPolicyMap());
         INodeManager nodeManager = ccs.getNodeManager();
         try {
-            byte[] acgBytes = predistributed ? null : JavaSerializationUtils.serialize(acg);
+            byte[] acgBytes = isDeployed() ? null : JavaSerializationUtils.serialize(acg);
             for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
                 String nodeId = entry.getKey();
                 final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
@@ -515,7 +516,8 @@ public class JobExecutor {
                     }
                     byte[] jagBytes = changed ? acgBytes : null;
                     node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
-                            connectorPolicies, jobRun.getFlags());
+                            connectorPolicies, jobRun.getFlags(),
+                            ccs.createOrGetJobParameterByteStore(jobId).getParameterMap(), deployedJobSpecId);
                 }
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index fa22dd3..26f8022 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -142,6 +142,7 @@ public class JobManager implements IJobManager {
 
     @Override
     public void prepareComplete(JobRun run, JobStatus status, List<Exception> exceptions) throws HyracksException {
+        ccs.removeJobParameterByteStore(run.getJobId());
         checkJob(run);
         if (status == JobStatus.FAILURE_BEFORE_EXECUTION) {
             run.setPendingStatus(JobStatus.FAILURE, exceptions);
@@ -306,9 +307,7 @@ public class JobManager implements IJobManager {
 
         CCServiceContext serviceCtx = ccs.getContext();
         JobSpecification spec = run.getJobSpecification();
-        if (!run.getExecutor().isPredistributed()) {
-            serviceCtx.notifyJobCreation(jobId, spec);
-        }
+        serviceCtx.notifyJobCreation(jobId, spec);
         run.setStatus(JobStatus.RUNNING, null);
         executeJobInternal(run);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index ef0bca2..58f44ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.ActivityClusterId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
@@ -45,7 +46,7 @@ import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.PreDistributedJobStore.PreDistributedJobDescriptor;
+import org.apache.hyracks.control.cc.DeployedJobSpecStore.DeployedJobSpecDescriptor;
 import org.apache.hyracks.control.cc.executor.ActivityPartitionDetails;
 import org.apache.hyracks.control.cc.executor.JobExecutor;
 import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
@@ -114,21 +115,23 @@ public class JobRun implements IJobStatusConditionVariable {
         createTime = System.currentTimeMillis();
     }
 
-    //Run a Pre-distributed job by passing the JobId
+    //Run a deployed job spec
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags,
-            PreDistributedJobDescriptor distributedJobDescriptor)
+            DeployedJobSpecDescriptor deployedJobSpecDescriptor, Map<byte[], byte[]> jobParameters,
+            DeployedJobSpecId deployedJobSpecId)
             throws HyracksException {
         this(deploymentId, jobId, jobFlags,
-                distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
-        Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
-        this.scheduler = new JobExecutor(ccs, this, constaints, true);
+                deployedJobSpecDescriptor.getJobSpecification(), deployedJobSpecDescriptor.getActivityClusterGraph());
+        ccs.createOrGetJobParameterByteStore(jobId).setParameters(jobParameters);
+        Set<Constraint> constaints = deployedJobSpecDescriptor.getActivityClusterGraphConstraints();
+        this.scheduler = new JobExecutor(ccs, this, constaints, deployedJobSpecId);
     }
 
     //Run a new job by creating an ActivityClusterGraph
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
             IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags) {
         this(deploymentId, jobId, jobFlags, acggf.getJobSpecification(), acgg.initialize());
-        this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), false);
+        this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), null);
     }
 
     public DeploymentId getDeploymentId() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
new file mode 100644
index 0000000..f7335a8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import java.util.EnumSet;
+
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DeployJobSpecWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+    private final byte[] acggfBytes;
+    private final DeployedJobSpecId deployedJobSpecId;
+    private final IResultCallback<DeployedJobSpecId> callback;
+
+    public DeployJobSpecWork(ClusterControllerService ccs, byte[] acggfBytes, DeployedJobSpecId deployedJobSpecId,
+            IResultCallback<DeployedJobSpecId> callback) {
+        this.deployedJobSpecId = deployedJobSpecId;
+        this.ccs = ccs;
+        this.acggfBytes = acggfBytes;
+        this.callback = callback;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        try {
+            final CCServiceContext ccServiceCtx = ccs.getContext();
+            ccs.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(deployedJobSpecId);
+            IActivityClusterGraphGeneratorFactory acggf =
+                    (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx);
+            IActivityClusterGraphGenerator acgg =
+                    acggf.createActivityClusterGraphGenerator(ccServiceCtx, EnumSet.noneOf(JobFlag.class));
+            ActivityClusterGraph acg = acgg.initialize();
+            ccs.getDeployedJobSpecStore().addDeployedJobSpecDescriptor(deployedJobSpecId, acg,
+                    acggf.getJobSpecification(),
+                    acgg.getConstraints());
+
+            byte[] acgBytes = JavaSerializationUtils.serialize(acg);
+
+            INodeManager nodeManager = ccs.getNodeManager();
+            for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
+                node.getNodeController().deployJobSpec(deployedJobSpecId, acgBytes);
+            }
+            callback.setValue(deployedJobSpecId);
+        } catch (Exception e) {
+            callback.setException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java
new file mode 100644
index 0000000..8afdf42
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DeployedJobFailureWork extends SynchronizableWork {
+    protected final DeployedJobSpecId deployedJobSpecId;
+    protected final String nodeId;
+
+    public DeployedJobFailureWork(DeployedJobSpecId deployedJobSpecId, String nodeId) {
+        this.deployedJobSpecId = deployedJobSpecId;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void doRun() throws HyracksException {
+        throw HyracksException.create(ErrorCode.DEPLOYED_JOB_FAILURE, deployedJobSpecId, nodeId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
deleted file mode 100644
index df98252..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.work;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.work.IResultCallback;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-
-public class DestroyJobWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
-    private final JobId jobId;
-    private final IResultCallback<JobId> callback;
-
-    public DestroyJobWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobId> callback) {
-        this.jobId = jobId;
-        this.ccs = ccs;
-        this.callback = callback;
-    }
-
-    @Override
-    protected void doRun() throws Exception {
-        try {
-            ccs.getPreDistributedJobStore().removeDistributedJobDescriptor(jobId);
-            INodeManager nodeManager = ccs.getNodeManager();
-            for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
-                node.getNodeController().destroyJob(jobId);
-            }
-            callback.setValue(jobId);
-        } catch (Exception e) {
-            callback.setException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
deleted file mode 100644
index 5a57b1b..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.work;
-
-import java.util.EnumSet;
-
-import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
-import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobIdFactory;
-import org.apache.hyracks.api.util.JavaSerializationUtils;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCServiceContext;
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.deployment.DeploymentUtils;
-import org.apache.hyracks.control.common.work.IResultCallback;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-
-public class DistributeJobWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
-    private final byte[] acggfBytes;
-    private final JobIdFactory jobIdFactory;
-    private final IResultCallback<JobId> callback;
-
-    public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobIdFactory jobIdFactory,
-            IResultCallback<JobId> callback) {
-        this.jobIdFactory = jobIdFactory;
-        this.ccs = ccs;
-        this.acggfBytes = acggfBytes;
-        this.callback = callback;
-    }
-
-    @Override
-    protected void doRun() throws Exception {
-        try {
-            JobId jobId = jobIdFactory.create();
-            final CCServiceContext ccServiceCtx = ccs.getContext();
-            ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId);
-            IActivityClusterGraphGeneratorFactory acggf =
-                    (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx);
-            IActivityClusterGraphGenerator acgg =
-                    acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, EnumSet.noneOf(JobFlag.class));
-            ActivityClusterGraph acg = acgg.initialize();
-            ccs.getPreDistributedJobStore().addDistributedJobDescriptor(jobId, acg, acggf.getJobSpecification(),
-                    acgg.getConstraints());
-
-            ccServiceCtx.notifyJobCreation(jobId, acggf.getJobSpecification());
-
-            byte[] acgBytes = JavaSerializationUtils.serialize(acg);
-
-            INodeManager nodeManager = ccs.getNodeManager();
-            for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
-                node.getNodeController().distributeJob(jobId, acgBytes);
-            }
-
-            callback.setValue(jobId);
-        } catch (Exception e) {
-            callback.setException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
deleted file mode 100644
index f7fa2a4..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.work;
-
-import org.apache.hyracks.api.exceptions.ErrorCode;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-
-public class DistributedJobFailureWork extends SynchronizableWork {
-    protected final JobId jobId;
-    protected final String nodeId;
-
-    public DistributedJobFailureWork(JobId jobId, String nodeId) {
-        this.jobId = jobId;
-        this.nodeId = nodeId;
-    }
-
-    @Override
-    public void doRun() throws HyracksException {
-        throw HyracksException.create(ErrorCode.DISTRIBUTED_JOB_FAILURE, jobId, nodeId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index ed82705..cfedfc9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -18,9 +18,11 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
@@ -37,20 +39,23 @@ import org.apache.hyracks.control.common.work.SynchronizableWork;
 public class JobStartWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
     private final byte[] acggfBytes;
-    private final EnumSet<JobFlag> jobFlags;
+    private final Set<JobFlag> jobFlags;
     private final DeploymentId deploymentId;
-    private final JobId preDistributedJobId;
     private final IResultCallback<JobId> callback;
     private final JobIdFactory jobIdFactory;
+    private final DeployedJobSpecId deployedJobSpecId;
+    private final Map<byte[], byte[]> jobParameters;
 
     public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes,
-            EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, JobIdFactory jobIdFactory) {
+            Set<JobFlag> jobFlags, JobIdFactory jobIdFactory, Map<byte[], byte[]> jobParameters,
+            IResultCallback<JobId> callback, DeployedJobSpecId deployedJobSpecId) {
         this.deploymentId = deploymentId;
-        this.preDistributedJobId = jobId;
         this.ccs = ccs;
         this.acggfBytes = acggfBytes;
         this.jobFlags = jobFlags;
         this.callback = callback;
+        this.deployedJobSpecId = deployedJobSpecId;
+        this.jobParameters = jobParameters;
         this.jobIdFactory = jobIdFactory;
     }
 
@@ -61,19 +66,18 @@ public class JobStartWork extends SynchronizableWork {
             final CCServiceContext ccServiceCtx = ccs.getContext();
             JobId jobId;
             JobRun run;
-            if (preDistributedJobId == null) {
-                jobId = jobIdFactory.create();
+            jobId = jobIdFactory.create();
+            if (deployedJobSpecId == null) {
                 //Need to create the ActivityClusterGraph
                 IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
                         .deserialize(acggfBytes, deploymentId, ccServiceCtx);
-                IActivityClusterGraphGenerator acgg =
-                        acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, jobFlags);
+                IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(ccServiceCtx, jobFlags);
                 run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
             } else {
-                jobId = preDistributedJobId;
                 //ActivityClusterGraph has already been distributed
                 run = new JobRun(ccs, deploymentId, jobId, jobFlags,
-                        ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
+                        ccs.getDeployedJobSpecStore().getDeployedJobSpecDescriptor(deployedJobSpecId), jobParameters,
+                        deployedJobSpecId);
             }
             jobManager.add(run);
             callback.setValue(jobId);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
new file mode 100644
index 0000000..143c8c1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class UndeployJobSpecWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+    private final DeployedJobSpecId deployedJobSpecId;
+    private final IResultCallback<DeployedJobSpecId> callback;
+
+    public UndeployJobSpecWork(ClusterControllerService ccs, DeployedJobSpecId deployedJobSpecId,
+            IResultCallback<DeployedJobSpecId> callback) {
+        this.deployedJobSpecId = deployedJobSpecId;
+        this.ccs = ccs;
+        this.callback = callback;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        try {
+            ccs.getDeployedJobSpecStore().removeDeployedJobSpecDescriptor(deployedJobSpecId);
+            INodeManager nodeManager = ccs.getNodeManager();
+            for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
+                node.getNodeController().undeployJobSpec(deployedJobSpecId);
+            }
+            callback.setValue(deployedJobSpecId);
+        } catch (Exception e) {
+            callback.setException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index ec8e045..6fd321e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.deployment.DeploymentStatus;
@@ -44,7 +45,7 @@ public interface IClusterController {
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
             throws Exception;
 
-    public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception;
+    public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception;
 
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index a10f8f0..5d781cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.control.common.base;
 
 import java.net.URL;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +28,7 @@ import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
@@ -38,7 +38,8 @@ import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 public interface INodeController {
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            Set<JobFlag> flags) throws Exception;
+            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
+            throws Exception;
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
 
@@ -50,9 +51,9 @@ public interface INodeController {
 
     public void undeployBinary(DeploymentId deploymentId) throws Exception;
 
-    public void distributeJob(JobId jobId, byte[] planBytes) throws Exception;
+    public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception;
 
-    public void destroyJob(JobId jobId) throws Exception;
+    public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
 
     public void dumpState(String stateDumpId) throws Exception;
 


Mime
View raw message