asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Murtadha Hubail (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [WIP][ASTERIXDB-2108][API][RT] Add Processed Objects Metric
Date Thu, 28 Sep 2017 01:08:54 GMT
Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2032

Change subject: [WIP][ASTERIXDB-2108][API][RT] Add Processed Objects Metric
......................................................................

[WIP][ASTERIXDB-2108][API][RT] Add Processed Objects Metric

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Introduce OperatorStats API to report operators runtime stats.
- Introduce StatsCollector API to report task runtime stats.
- Add OperatorStats for IndexSearchOperatorNodePushable (tuple count only).
- Add "processedObjects" metric to QueryService API.
- Pass "processedObjects" metric from CC to NCQueryService in ResultMetadata.
- Add metrics test cases.

Change-Id: Ie4afe6a676ef0b8a31d36d7dafc13a4023ebf177
---
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java
A asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm
A asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm
A asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
34 files changed, 892 insertions(+), 18 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/32/2032/1

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 57c4809..9f71be4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -62,17 +62,27 @@
 
     class ResultMetadata implements Serializable {
         private static final long serialVersionUID = 1L;
+        private long processedObjects;
 
         private final List<Triple<JobId, ResultSetId, ARecordType>> resultSets = new ArrayList<>();
 
         public List<Triple<JobId, ResultSetId, ARecordType>> getResultSets() {
             return resultSets;
         }
+
+        public long getProcessedObjects() {
+            return processedObjects;
+        }
+
+        public void setProcessedObjects(long processedObjects) {
+            this.processedObjects = processedObjects;
+        }
     }
 
-    public static class Stats {
+    class Stats {
         private long count;
         private long size;
+        private long processedObjects;
 
         public long getCount() {
             return count;
@@ -90,6 +100,13 @@
             this.size = size;
         }
 
+        public long getProcessedObjects() {
+            return processedObjects;
+        }
+
+        public void setProcessedObjects(long processedObjects) {
+            this.processedObjects = processedObjects;
+        }
     }
 
     /**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index f4c3949..b592ef2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -120,6 +120,7 @@
         if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultMetadata.getResultSets().isEmpty()) {
             for (Triple<JobId, ResultSetId, ARecordType> rsmd : resultMetadata.getResultSets()) {
                 ResultReader resultReader = new ResultReader(getHyracksDataset(), rsmd.getLeft(), rsmd.getMiddle());
+                stats.setProcessedObjects(resultMetadata.getProcessedObjects());
                 ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, rsmd.getRight());
             }
         } else {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index ad34d96..4e1d724 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -147,7 +147,8 @@
         EXECUTION_TIME("executionTime"),
         RESULT_COUNT("resultCount"),
         RESULT_SIZE("resultSize"),
-        ERROR_COUNT("errorCount");
+        ERROR_COUNT("errorCount"),
+        PROCESSED_OBJECTS_COUNT("processedObjects");
 
         private final String str;
 
@@ -271,7 +272,7 @@
     }
 
     private static void printMetrics(PrintWriter pw, long elapsedTime, long executionTime, long resultCount,
-            long resultSize, long errorCount) {
+            long resultSize, long processedObjects, long errorCount) {
         boolean hasErrors = errorCount != 0;
         pw.print("\t\"");
         pw.print(ResultFields.METRICS.str());
@@ -283,7 +284,10 @@
         pw.print("\t");
         ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), resultCount, true);
         pw.print("\t");
-        ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), resultSize, hasErrors);
+        ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), resultSize, true);
+        pw.print("\t");
+        ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), processedObjects, hasErrors);
+        pw.print("\t");
         if (hasErrors) {
             pw.print("\t");
             ResultUtil.printField(pw, Metrics.ERROR_COUNT.str(), errorCount, false);
@@ -421,7 +425,7 @@
             }
         }
         printMetrics(resultWriter, System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0],
-                stats.getCount(), stats.getSize(), errorCount);
+                stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount);
         resultWriter.print("}\n");
         resultWriter.flush();
         String result = stringWriter.toString();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2abc18f..768fe23 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -26,6 +26,7 @@
 import java.io.InputStreamReader;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
@@ -195,7 +196,14 @@
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
+import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
+import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
 /*
@@ -2367,12 +2375,14 @@
             case IMMEDIATE:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
                     final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
+                    updateJobStats(id, outMetadata, stats);
                     ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
                             metadataProvider.findOutputRecordType());
                 }, clientContextId, ctx);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
+                    updateJobStats(id, outMetadata, stats);
                     ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId));
                     if (outMetadata != null) {
                         outMetadata.getResultSets()
@@ -2385,6 +2395,28 @@
         }
     }
 
+    private void updateJobStats(JobId jobId, ResultMetadata outMetadata, Stats stats) {
+        final IJobManager jobManager =
+                ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager();
+        final JobRun run = jobManager.get(jobId);
+        if (run.getStatus() != JobStatus.TERMINATED) {
+            return;
+        }
+        final JobProfile jobProfile = run.getJobProfile();
+        final Collection<JobletProfile> jobletProfiles = jobProfile.getJobletProfiles().values();
+        long processedObjects = 0;
+        for (JobletProfile jp : jobletProfiles) {
+            final Collection<TaskProfile> jobletTasksProfile = jp.getTaskProfiles().values();
+            for (TaskProfile tp : jobletTasksProfile) {
+                processedObjects += tp.getStatsCollector().getAggregatedStats().getTupleCounter().get();
+            }
+        }
+        stats.setProcessedObjects(processedObjects);
+        if (outMetadata != null) {
+            outMetadata.setProcessedObjects(processedObjects);
+        }
+    }
+
     private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker,
             ResultDelivery resultDelivery, String clientContextId, IStatementExecutorContext ctx,
             ResultSetId resultSetId, MutableBoolean printed) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index 0bba635..e1040ba 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -20,6 +20,7 @@
 
 import java.io.InputStream;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
 import java.util.logging.Logger;
 
@@ -134,4 +135,23 @@
         }
         return null;
     }
+
+    public static InputStream extractMetrics(InputStream resultStream) throws Exception {
+        ObjectMapper om = new ObjectMapper();
+        String resultStr = IOUtils.toString(resultStream, Charset.defaultCharset());
+        ObjectNode result = om.readValue(resultStr, ObjectNode.class);
+        String metrics = "";
+        String field;
+        for (Iterator<String> sIter = result.fieldNames(); sIter.hasNext(); ) {
+            field = sIter.next();
+            switch (field) {
+                case "metrics":
+                    metrics = om.writeValueAsString(result.get(field));
+                    break;
+                default:
+                    break;
+            }
+        }
+        return IOUtils.toInputStream(metrics, StandardCharsets.UTF_8);
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index b9b7bda..6dff571 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -67,9 +67,9 @@
 import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat;
 import org.apache.asterix.testframework.context.TestFileContext;
 import org.apache.asterix.testframework.xml.ComparisonEnum;
-import org.apache.asterix.testframework.xml.TestGroup;
 import org.apache.asterix.testframework.xml.TestCase.CompilationUnit;
 import org.apache.asterix.testframework.xml.TestCase.CompilationUnit.Parameter;
+import org.apache.asterix.testframework.xml.TestGroup;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.output.ByteArrayOutputStream;
@@ -124,6 +124,7 @@
     public static final String DELIVERY_ASYNC = "async";
     public static final String DELIVERY_DEFERRED = "deferred";
     public static final String DELIVERY_IMMEDIATE = "immediate";
+    private static final String METRICS_QUERY_TYPE = "metrics";
 
     private static Method managixExecuteMethod = null;
     private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
@@ -888,6 +889,7 @@
             case "query":
             case "async":
             case "deferred":
+            case METRICS_QUERY_TYPE:
                 // isDmlRecoveryTest: insert Crash and Recovery
                 if (isDmlRecoveryTest) {
                     executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
@@ -1203,7 +1205,11 @@
             final URI uri = getEndpoint(Servlets.QUERY_SERVICE);
             if (DELIVERY_IMMEDIATE.equals(delivery)) {
                 resultStream = executeQueryService(statement, fmt, uri, params, true, null, true);
-                resultStream = ResultExtractor.extract(resultStream);
+                if (reqType.equals(METRICS_QUERY_TYPE)) {
+                    resultStream = ResultExtractor.extractMetrics(resultStream);
+                } else {
+                    resultStream = ResultExtractor.extract(resultStream);
+                }
             } else {
                 String handleVar = getHandleVariable(statement);
                 resultStream = executeQueryService(statement, fmt, uri, upsertParam(params, "mode", delivery), true);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java
new file mode 100644
index 0000000..c7fae46
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the cluster state runtime tests with the storage parallelism.
+ */
+@RunWith(Parameterized.class)
+public class MetricsExecutionTest {
+    protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+    }
+
+    @Parameters(name = "MetricsExecutionTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.buildTestsInXml("metrics.xml");
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public MetricsExecutionTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        LangExecutionUtil.test(tcCtx);
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
new file mode 100644
index 0000000..f65ede3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"
+            QueryFileExtension=".sqlpp">
+  <test-group name="metrics">
+    <test-case FilePath="metrics">
+      <compilation-unit name="full-scan">
+        <output-dir compare="Text">full-scan</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="metrics">
+      <compilation-unit name="primary-index">
+        <output-dir compare="Text">primary-index</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="metrics">
+      <compilation-unit name="secondary-index">
+        <output-dir compare="Text">secondary-index</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+</test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp
new file mode 100644
index 0000000..441bee4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.AddressType as
+{
+  number : bigint,
+  street : string,
+  city : string
+};
+
+create type test.CustomerType as
+ closed {
+  cid : bigint,
+  name : string,
+  age : bigint?,
+  address : AddressType?,
+  lastorder : {
+      oid : bigint,
+      total : float
+  }
+};
+
+create dataset Customers(CustomerType) primary key cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp
new file mode 100644
index 0000000..2c5d5d9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+use test;
+
+load dataset Customers using localfs
+  ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
+  (`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp
new file mode 100644
index 0000000..3671133
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+use test;
+
+select count(*) from Customers
+where name = "Marvella Loud";
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp
new file mode 100644
index 0000000..4cccd9f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+drop dataverse test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp
new file mode 100644
index 0000000..5fb58b4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on primary index scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.AddressType as
+{
+  number : bigint,
+  street : string,
+  city : string
+};
+
+create type test.CustomerType as
+ closed {
+  cid : bigint,
+  name : string,
+  age : bigint?,
+  address : AddressType?,
+  lastorder : {
+      oid : bigint,
+      total : float
+  }
+};
+
+create dataset Customers(CustomerType) primary key cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp
new file mode 100644
index 0000000..046c40b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on primary index scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+use test;
+
+load dataset Customers using localfs
+  ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
+  (`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp
new file mode 100644
index 0000000..969949e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on primary index scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+use test;
+
+select count(*) from Customers
+where cid = 996;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp
new file mode 100644
index 0000000..1dbe56e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on primary index scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+drop dataverse test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp
new file mode 100644
index 0000000..74f8a19
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on secondary index scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.AddressType as
+{
+  number : bigint,
+  street : string,
+  city : string
+};
+
+create type test.CustomerType as
+ closed {
+  cid : bigint,
+  name : string,
+  age : bigint?,
+  address : AddressType?,
+  lastorder : {
+      oid : bigint,
+      total : float
+  }
+};
+
+create dataset Customers(CustomerType) primary key cid;
+create index customer_name_idx on Customers(name);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp
new file mode 100644
index 0000000..40d87f7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on secondary index scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+use test;
+
+load dataset Customers using localfs
+  ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
+  (`format`=`adm`));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp
new file mode 100644
index 0000000..1c5126f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on secondary index scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+use test;
+
+select count(*) from Customers
+where name = "Marvella Loud";
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp
new file mode 100644
index 0000000..1de3b3b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on secondary index scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+drop dataverse test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm
new file mode 100644
index 0000000..aca314d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm
@@ -0,0 +1 @@
+.*"processedObjects":10.*
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm
new file mode 100644
index 0000000..91a2dfd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm
@@ -0,0 +1 @@
+.*"processedObjects":1.*
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm
new file mode 100644
index 0000000..8578d41
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm
@@ -0,0 +1 @@
+.*"processedObjects":2.*
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index df693b2..10bb336 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.IOperatorEnvironment;
 import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatableRegistry;
 
@@ -52,4 +53,6 @@
     Object getSharedObject();
 
     Set<JobFlag> getJobFlags();
+
+    IStatsCollector getStatsCollector();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
new file mode 100644
index 0000000..35ae8b6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job.profiling;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.io.IWritable;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+
+public interface IOperatorStats extends IWritable, Serializable {
+
+    /**
+     * @return The name of the operator
+     */
+    String getName();
+
+    /**
+     * @return A counter used to track the number of tuples
+     * accessed by an operator
+     */
+    ICounter getTupleCounter();
+
+    /**
+     * @return A counter used to track the execution time
+     * of an operator
+     */
+    ICounter getTimeCounter();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
new file mode 100644
index 0000000..1e73146
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job.profiling;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IWritable;
+
+public interface IStatsCollector extends IWritable, Serializable {
+
+    /**
+     * Adds {@link IOperatorStats} to the stats collections
+     *
+     * @param operatorStats
+     * @throws HyracksDataException when an operator with the same was already added.
+     */
+    void add(IOperatorStats operatorStats) throws HyracksDataException;
+
+    /**
+     * @param operatorName
+     * @return {@link IOperatorStats} for the operator with name <code>operatorName</code>
+     * if one exists or else null.
+     */
+    IOperatorStats getOperatorStats(String operatorName);
+
+    /**
+     * @return A special {@link IOperatorStats} that has the aggregated stats
+     * from all operators in the collection.
+     */
+    IOperatorStats getAggregatedStats();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
new file mode 100644
index 0000000..718ac3d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.job.profiling;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+import org.apache.hyracks.control.common.job.profiling.counters.Counter;
+
+public class OperatorStats implements IOperatorStats {
+
+    public final String operatorName;
+    public final ICounter tupleCounter;
+    public final ICounter timeCounter;
+
+    public OperatorStats(String operatorName) {
+        if (operatorName == null || operatorName.isEmpty()) {
+            throw new IllegalArgumentException("operatorName must not be null or empty");
+        }
+        this.operatorName = operatorName;
+        tupleCounter = new Counter("tupleCounter");
+        timeCounter = new Counter("timeCounter");
+    }
+
+    public static IOperatorStats create(DataInput input) throws IOException {
+        String name = input.readUTF();
+        OperatorStats operatorStats = new OperatorStats(name);
+        operatorStats.readFields(input);
+        return operatorStats;
+    }
+
+    @Override
+    public String getName() {
+        return operatorName;
+    }
+
+    @Override
+    public ICounter getTupleCounter() {
+        return tupleCounter;
+    }
+
+    @Override
+    public ICounter getTimeCounter() {
+        return timeCounter;
+    }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeUTF(operatorName);
+        output.writeLong(tupleCounter.get());
+        output.writeLong(timeCounter.get());
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        tupleCounter.set(input.readLong());
+        timeCounter.set(input.readLong());
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
new file mode 100644
index 0000000..90cdc72
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.job.profiling;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+
+public class StatsCollector implements IStatsCollector {
+
+    private final Map<String, IOperatorStats> operatorStatsMap = new HashMap<>();
+
+    @Override
+    public void add(IOperatorStats operatorStats) throws HyracksDataException {
+        if (operatorStatsMap.containsKey(operatorStats.getName())) {
+            throw new IllegalArgumentException("Operator with the same name already exists");
+        }
+        operatorStatsMap.put(operatorStats.getName(), operatorStats);
+    }
+
+    @Override
+    public IOperatorStats getOperatorStats(String operatorName) {
+        return operatorStatsMap.get(operatorName);
+    }
+
+    public static StatsCollector create(DataInput input) throws IOException {
+        StatsCollector statsCollector = new StatsCollector();
+        statsCollector.readFields(input);
+        return statsCollector;
+    }
+
+    @Override
+    public IOperatorStats getAggregatedStats() {
+        IOperatorStats aggregatedStats = new OperatorStats("aggregated");
+        for (IOperatorStats stats : operatorStatsMap.values()) {
+            aggregatedStats.getTupleCounter().update(stats.getTupleCounter().get());
+            aggregatedStats.getTimeCounter().update(stats.getTupleCounter().get());
+        }
+        return aggregatedStats;
+    }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeInt(operatorStatsMap.size());
+        for (IOperatorStats operatorStats : operatorStatsMap.values()) {
+            operatorStats.writeFields(output);
+        }
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        int operatorCount = input.readInt();
+        for (int i = 0; i < operatorCount; i++) {
+            IOperatorStats opStats = OperatorStats.create(input);
+            operatorStatsMap.put(opStats.getName(), opStats);
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 680d2f9..5cac38c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -25,12 +25,15 @@
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.control.common.job.profiling.StatsCollector;
+import org.apache.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.partitions.PartitionId;
-import org.apache.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
 
 public class TaskProfile extends AbstractProfile {
     private static final long serialVersionUID = 1L;
@@ -38,6 +41,8 @@
     private TaskAttemptId taskAttemptId;
 
     private Map<PartitionId, PartitionProfile> partitionSendProfile;
+
+    private IStatsCollector statsCollector;
 
     public static TaskProfile create(DataInput dis) throws IOException {
         TaskProfile taskProfile = new TaskProfile();
@@ -49,9 +54,11 @@
 
     }
 
-    public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile) {
+    public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile,
+            IStatsCollector statsCollector) {
         this.taskAttemptId = taskAttemptId;
         this.partitionSendProfile = new HashMap<PartitionId, PartitionProfile>(partitionSendProfile);
+        this.statsCollector = statsCollector;
     }
 
     public TaskAttemptId getTaskId() {
@@ -104,6 +111,10 @@
         return json;
     }
 
+    public IStatsCollector getStatsCollector() {
+        return statsCollector;
+    }
+
     @Override
     public void readFields(DataInput input) throws IOException {
         super.readFields(input);
@@ -115,6 +126,7 @@
             PartitionProfile value = PartitionProfile.create(input);
             partitionSendProfile.put(key, value);
         }
+        statsCollector = StatsCollector.create(input);
     }
 
     @Override
@@ -126,5 +138,6 @@
             entry.getKey().writeFields(output);
             entry.getValue().writeFields(output);
         }
+        statsCollector.writeFields(output);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index d7b4be0..6dc9619 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -53,6 +53,7 @@
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
 import org.apache.hyracks.control.common.job.PartitionRequest;
 import org.apache.hyracks.control.common.job.PartitionState;
+import org.apache.hyracks.control.common.job.profiling.StatsCollector;
 import org.apache.hyracks.control.common.job.profiling.counters.Counter;
 import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -190,7 +191,7 @@
         }
         for (Task task : taskMap.values()) {
             TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(),
-                    new Hashtable<>(task.getPartitionSendProfile()));
+                    new Hashtable<>(task.getPartitionSendProfile()), new StatsCollector());
             task.dumpProfile(taskProfile);
             jProfile.getTaskProfiles().put(task.getTaskAttemptId(), taskProfile);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index bff2794..fcd4bde 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -53,12 +53,14 @@
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.IOperatorEnvironment;
 import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.common.job.PartitionState;
+import org.apache.hyracks.control.common.job.profiling.StatsCollector;
 import org.apache.hyracks.control.common.job.profiling.counters.Counter;
 import org.apache.hyracks.control.common.job.profiling.om.PartitionProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -107,6 +109,8 @@
 
     private final Set<JobFlag> jobFlags;
 
+    private final IStatsCollector statsCollector;
+
     public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String displayName,
             ExecutorService executor, NodeControllerService ncs,
             List<List<PartitionChannel>> inputChannelsFromConnectors) {
@@ -124,6 +128,7 @@
         exceptions = new CopyOnWriteArrayList<>(); // Multiple threads could add exceptions to this list.
         this.ncs = ncs;
         this.inputChannelsFromConnectors = inputChannelsFromConnectors;
+        statsCollector = new StatsCollector();
     }
 
     public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -453,4 +458,9 @@
     public Set<JobFlag> getJobFlags() {
         return jobFlags;
     }
+
+    @Override
+    public IStatsCollector getStatsCollector() {
+        return statsCollector;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index 7728d16..675926e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -38,8 +38,8 @@
 
     @Override
     public void run() {
-        TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
-        task.dumpProfile(taskProfile);
+        TaskProfile taskProfile =
+                new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile(), task.getStatsCollector());
         try {
             ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
                     ncs.getId(), taskProfile);
@@ -53,4 +53,4 @@
     public String toString() {
         return getClass().getSimpleName() + ":" + task.getTaskAttemptId();
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 07d07c3..85b6ff3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -30,6 +30,8 @@
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.control.common.job.profiling.OperatorStats;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -79,6 +81,7 @@
     protected ArrayTupleBuilder nonFilterTupleBuild;
     protected final ISearchOperationCallbackFactory searchCallbackFactory;
     protected boolean failed = false;
+    private final IOperatorStats stats;
 
     public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
@@ -105,6 +108,8 @@
             maxFilterKey = new PermutingFrameTupleReference();
             maxFilterKey.setFieldPermutation(maxFilterFieldIndexes);
         }
+        stats = new OperatorStats(getDisplayName());
+        ctx.getStatsCollector().add(stats);
     }
 
     protected abstract ISearchPredicate createSearchPredicate();
@@ -154,9 +159,9 @@
     }
 
     protected void writeSearchResults(int tupleIndex) throws Exception {
-        boolean matched = false;
+        long matchingTupleCount = 0;
         while (cursor.hasNext()) {
-            matched = true;
+            matchingTupleCount++;
             tb.reset();
             cursor.next();
             if (retainInput) {
@@ -174,8 +179,9 @@
             }
             FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
         }
+        stats.getTupleCounter().update(matchingTupleCount);
 
-        if (!matched && retainInput && retainMissing) {
+        if (matchingTupleCount == 0 && retainInput && retainMissing) {
             FrameUtils.appendConcatToWriter(writer, appender, accessor, tupleIndex,
                     nonMatchTupleBuild.getFieldEndOffsets(), nonMatchTupleBuild.getByteArray(), 0,
                     nonMatchTupleBuild.getSize());
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index b100300..3d13cf9 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -36,8 +36,10 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.control.common.job.profiling.StatsCollector;
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
 
 public class TestTaskContext implements IHyracksTaskContext {
@@ -46,6 +48,7 @@
     private WorkspaceFileFactory fileFactory;
     private Map<Object, IStateObject> stateObjectMap = new HashMap<>();
     private Object sharedObject;
+    private final IStatsCollector statsCollector = new StatsCollector();
 
     public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) {
         this.jobletContext = jobletContext;
@@ -163,4 +166,9 @@
     public Set<JobFlag> getJobFlags() {
         return EnumSet.noneOf(JobFlag.class);
     }
+
+    @Override
+    public IStatsCollector getStatsCollector() {
+        return statsCollector;
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2032
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie4afe6a676ef0b8a31d36d7dafc13a4023ebf177
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhubail@apache.org>

Mime
View raw message