asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [09/34] asterixdb git commit: [NO ISSUE][RT] Add job start timestamp to the joblet context
Date Mon, 09 Apr 2018 21:26:27 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
index 1ddb7e8..2879576 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
@@ -23,14 +23,14 @@ import java.io.IOException;
 
 import org.apache.asterix.dataflow.data.nontagged.serde.ADateSerializerDeserializer;
 import org.apache.asterix.om.base.temporal.DateTimeFormatUtils;
-import org.apache.asterix.om.base.temporal.GregorianCalendarSystem;
 import org.apache.asterix.om.base.temporal.DateTimeFormatUtils.DateTimeParseMode;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
+import org.apache.asterix.om.base.temporal.GregorianCalendarSystem;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -45,8 +45,6 @@ import org.apache.hyracks.util.string.UTF8StringWriter;
 
 public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
-    public final static FunctionIdentifier FID = BuiltinFunctions.PRINT_DATE;
-    private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
 
     public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory()
{
 
@@ -76,6 +74,8 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
                     private StringBuilder sbder = new StringBuilder();
                     private final UTF8StringWriter utf8Writer = new UTF8StringWriter();
 
+                    private final DateTimeFormatUtils util = DateTimeFormatUtils.getInstance();
+
                     @Override
                     public void evaluate(IFrameTupleReference tuple, IPointable result) throws
HyracksDataException {
                         resultStorage.reset();
@@ -101,11 +101,11 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
                             int formatLength = UTF8StringUtil.getUTFLength(bytes1, offset1
+ 1);
                             int offset = UTF8StringUtil.getNumBytesToStoreLength(formatLength);
                             sbder.delete(0, sbder.length());
-                            DT_UTILS.printDateTime(chronon, 0, bytes1, offset1 + 1 + offset,
formatLength, sbder,
+                            util.printDateTime(chronon, 0, bytes1, offset1 + 1 + offset,
formatLength, sbder,
                                     DateTimeParseMode.DATE_ONLY);
 
                             out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
-                            utf8Writer.writeUTF8(sbder.toString(), out);
+                            utf8Writer.writeUTF8(sbder, out);
                         } catch (IOException ex) {
                             throw new HyracksDataException(ex);
                         }
@@ -113,7 +113,6 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
                     }
                 };
             }
-
         };
     }
 
@@ -122,7 +121,6 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
      */
     @Override
     public FunctionIdentifier getIdentifier() {
-        return FID;
+        return BuiltinFunctions.PRINT_DATE;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
index b9f6e50..6f22bf1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
@@ -24,12 +24,12 @@ import java.io.IOException;
 import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
 import org.apache.asterix.om.base.temporal.DateTimeFormatUtils;
 import org.apache.asterix.om.base.temporal.DateTimeFormatUtils.DateTimeParseMode;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -44,8 +44,6 @@ import org.apache.hyracks.util.string.UTF8StringWriter;
 
 public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
-    public final static FunctionIdentifier FID = BuiltinFunctions.PRINT_DATETIME;
-    private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
 
     public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory()
{
 
@@ -75,6 +73,8 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
                     private UTF8StringWriter utf8Writer = new UTF8StringWriter();
                     private UTF8StringPointable utf8Ptr = new UTF8StringPointable();
 
+                    private final DateTimeFormatUtils util = DateTimeFormatUtils.getInstance();
+
                     @Override
                     public void evaluate(IFrameTupleReference tuple, IPointable result) throws
HyracksDataException {
                         resultStorage.reset();
@@ -100,11 +100,11 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
                             utf8Ptr.set(bytes1, offset1 + 1, len1 - 1);
                             int formatLength = utf8Ptr.getUTF8Length();
                             sbder.delete(0, sbder.length());
-                            DT_UTILS.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(),
+                            util.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(),
                                     formatLength, sbder, DateTimeParseMode.DATETIME);
 
                             out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
-                            utf8Writer.writeUTF8(sbder.toString(), out);
+                            utf8Writer.writeUTF8(sbder, out);
                         } catch (IOException ex) {
                             throw new HyracksDataException(ex);
                         }
@@ -112,7 +112,6 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
                     }
                 };
             }
-
         };
     }
 
@@ -121,7 +120,6 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
      */
     @Override
     public FunctionIdentifier getIdentifier() {
-        return FID;
+        return BuiltinFunctions.PRINT_DATETIME;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
index 51c9dcd..0e7004f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
@@ -24,12 +24,12 @@ import java.io.IOException;
 import org.apache.asterix.dataflow.data.nontagged.serde.ATimeSerializerDeserializer;
 import org.apache.asterix.om.base.temporal.DateTimeFormatUtils;
 import org.apache.asterix.om.base.temporal.DateTimeFormatUtils.DateTimeParseMode;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -44,8 +44,6 @@ import org.apache.hyracks.util.string.UTF8StringWriter;
 
 public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
-    public final static FunctionIdentifier FID = BuiltinFunctions.PRINT_TIME;
-    private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
 
     public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory()
{
 
@@ -74,6 +72,8 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
                     private final UTF8StringWriter writer = new UTF8StringWriter();
                     private final UTF8StringPointable utf8Ptr = new UTF8StringPointable();
 
+                    private final DateTimeFormatUtils util = DateTimeFormatUtils.getInstance();
+
                     @Override
                     public void evaluate(IFrameTupleReference tuple, IPointable result) throws
HyracksDataException {
                         resultStorage.reset();
@@ -100,11 +100,11 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
                             utf8Ptr.set(bytes1, offset1 + 1, len1 - 1);
                             int formatLength = utf8Ptr.getUTF8Length();
                             sbder.delete(0, sbder.length());
-                            DT_UTILS.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(),
+                            util.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(),
                                     formatLength, sbder, DateTimeParseMode.TIME_ONLY);
 
                             out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
-                            writer.writeUTF8(sbder.toString(), out);
+                            writer.writeUTF8(sbder, out);
                         } catch (IOException ex) {
                             throw new HyracksDataException(ex);
                         }
@@ -112,7 +112,6 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
                     }
                 };
             }
-
         };
     }
 
@@ -121,7 +120,6 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
      */
     @Override
     public FunctionIdentifier getIdentifier() {
-        return FID;
+        return BuiltinFunctions.PRINT_TIME;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 1100335..7a59926 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -31,6 +31,8 @@ public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocat
 
     JobId getJobId();
 
+    long getJobStartTime();
+
     ICounterContext getCounterContext();
 
     Object getGlobalJobData();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index ac06344..5245571 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -518,7 +518,8 @@ public class JobExecutor {
                     byte[] jagBytes = changed ? acgBytes : null;
                     node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
                             connectorPolicies, jobRun.getFlags(),
-                            ccs.createOrGetJobParameterByteStore(jobId).getParameterMap(),
deployedJobSpecId);
+                            ccs.createOrGetJobParameterByteStore(jobId).getParameterMap(),
deployedJobSpecId,
+                            jobRun.getStartTime());
                 }
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 9ec55f4..92764a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -38,7 +38,7 @@ import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 public interface INodeController {
     void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId,
IConnectorPolicy> connectorPolicies,
-            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId
deployedJobSpecId)
+            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId
deployedJobSpecId, long startTime)
             throws Exception;
 
     void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index c10c8981..8e02936 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -764,11 +764,12 @@ public class CCNCFunctions {
         private final Set<JobFlag> flags;
         private final Map<byte[], byte[]> jobParameters;
         private final DeployedJobSpecId deployedJobSpecId;
+        private final long jobStartTime;
 
         public StartTasksFunction(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
                 List<TaskAttemptDescriptor> taskDescriptors,
                 Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag>
flags,
-                Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
{
+                Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId,
long jobStartTime) {
             this.deploymentId = deploymentId;
             this.jobId = jobId;
             this.planBytes = planBytes;
@@ -777,6 +778,7 @@ public class CCNCFunctions {
             this.flags = flags;
             this.jobParameters = jobParameters;
             this.deployedJobSpecId = deployedJobSpecId;
+            this.jobStartTime = jobStartTime;
         }
 
         @Override
@@ -816,6 +818,10 @@ public class CCNCFunctions {
             return flags;
         }
 
+        public long getJobStartTime() {
+            return jobStartTime;
+        }
+
         public static Object deserialize(ByteBuffer buffer, int length) throws Exception
{
             ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(),
length);
             DataInputStream dis = new DataInputStream(bais);
@@ -885,8 +891,10 @@ public class CCNCFunctions {
                 deployedJobSpecId = DeployedJobSpecId.create(dis);
             }
 
+            long jobStartTime = dis.readLong();
+
             return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors,
connectorPolicies, flags,
-                    jobParameters, deployedJobSpecId);
+                    jobParameters, deployedJobSpecId, jobStartTime);
         }
 
         public static void serialize(OutputStream out, Object object) throws Exception {
@@ -935,11 +943,13 @@ public class CCNCFunctions {
             }
 
             //write deployed job spec id
-            dos.writeBoolean(fn.getDeployedJobSpecId() == null ? false : true);
+            dos.writeBoolean(fn.getDeployedJobSpecId() != null);
             if (fn.getDeployedJobSpecId() != null) {
                 fn.getDeployedJobSpecId().writeFields(dos);
             }
 
+            //write job start time
+            dos.writeLong(fn.jobStartTime);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 429cb26..b78e53f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -63,10 +63,10 @@ public class NodeControllerRemoteProxy implements INodeController {
     @Override
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId,
IConnectorPolicy> connectorPolicies,
-            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId
deployedJobSpecId)
-            throws Exception {
+            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId
deployedJobSpecId,
+            long jobStartTime) throws Exception {
         StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors,
-                connectorPolicies, flags, jobParameters, deployedJobSpecId);
+                connectorPolicies, flags, jobParameters, deployedJobSpecId, jobStartTime);
         ipcHandle.send(-1, stf, null);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 8790434..55bc192 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -103,9 +103,11 @@ public class Joblet implements IHyracksJobletContext, ICounterContext
{
 
     private final IJobletEventListenerFactory jobletEventListenerFactory;
 
+    private final long jobStartTime;
+
     public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId
jobId,
             INCServiceContext serviceCtx, ActivityClusterGraph acg,
-            IJobletEventListenerFactory jobletEventListenerFactory) {
+            IJobletEventListenerFactory jobletEventListenerFactory, long jobStartTime) {
         this.nodeController = nodeController;
         this.serviceCtx = serviceCtx;
         this.deploymentId = deploymentId;
@@ -131,6 +133,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext
{
         }
         IGlobalJobDataFactory gjdf = acg.getGlobalJobDataFactory();
         globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
+        this.jobStartTime = jobStartTime;
     }
 
     @Override
@@ -151,6 +154,11 @@ public class Joblet implements IHyracksJobletContext, ICounterContext
{
         return env;
     }
 
+    @Override
+    public long getJobStartTime() {
+        return jobStartTime;
+    }
+
     public void addTask(Task task) {
         taskMap.put(task.getTaskAttemptId(), task);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index f55e250..735f7cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -65,7 +65,7 @@ final class NodeControllerIPCI implements IIPCI {
                 ncs.getWorkQueue()
                         .schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(),
stf.getPlanBytes(),
                                 stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags(),
-                                stf.getJobParameters(), stf.getDeployedJobSpecId()));
+                                stf.getJobParameters(), stf.getDeployedJobSpecId(), stf.getJobStartTime()));
                 return;
             case ABORT_TASKS:
                 CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction)
fn;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 6a5785a..660621e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -95,10 +95,12 @@ public class StartTasksWork extends AbstractWork {
 
     private final Map<byte[], byte[]> jobParameters;
 
+    private final long jobStartTime;
+
     public StartTasksWork(NodeControllerService ncs, DeploymentId deploymentId, JobId jobId,
byte[] acgBytes,
             List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, Set<JobFlag>
flags,
-            Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
{
+            Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId,
long jobStartTime) {
         this.ncs = ncs;
         this.deploymentId = deploymentId;
         this.jobId = jobId;
@@ -108,6 +110,7 @@ public class StartTasksWork extends AbstractWork {
         this.connectorPoliciesMap = connectorPoliciesMap;
         this.flags = flags;
         this.jobParameters = jobParameters;
+        this.jobStartTime = jobStartTime;
     }
 
     @Override
@@ -212,7 +215,7 @@ public class StartTasksWork extends AbstractWork {
                 }
                 listenerFactory.updateListenerJobParameters(ncs.createOrGetJobParameterByteStore(jobId));
             }
-            ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg, listenerFactory);
+            ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg, listenerFactory, jobStartTime);
             jobletMap.put(jobId, ji);
         }
         return ji;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index 1d38d3d..b022b15 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -39,6 +39,7 @@ public class TestJobletContext implements IHyracksJobletContext {
     private final FrameManager frameManger;
     private JobId jobId;
     private WorkspaceFileFactory fileFactory;
+    private final long jobStartTime;
 
     public TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId)
throws HyracksException {
         this.frameSize = frameSize;
@@ -46,6 +47,7 @@ public class TestJobletContext implements IHyracksJobletContext {
         this.jobId = jobId;
         fileFactory = new WorkspaceFileFactory(this, (IIOManager) getIOManager());
         this.frameManger = new FrameManager(frameSize);
+        this.jobStartTime = System.currentTimeMillis();
     }
 
     ByteBuffer allocateFrame() throws HyracksDataException {
@@ -113,6 +115,11 @@ public class TestJobletContext implements IHyracksJobletContext {
     }
 
     @Override
+    public long getJobStartTime() {
+        return jobStartTime;
+    }
+
+    @Override
     public Object getGlobalJobData() {
         return null;
     }


Mime
View raw message