tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-1810: Remove QueryMasterTask cache immediately, if it stored to persistent storage.
Date Thu, 03 Sep 2015 12:31:14 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 22ab1cf97 -> 8c50410dc


TAJO-1810: Remove QueryMasterTask cache immediately, if it stored to persistent storage.

Closes #721


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8c50410d
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8c50410d
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8c50410d

Branch: refs/heads/master
Commit: 8c50410dcf370290982719f8add9aad814a5d4e0
Parents: 22ab1cf
Author: Jinho Kim <jhkim@apache.org>
Authored: Thu Sep 3 21:30:17 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Thu Sep 3 21:30:17 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../java/org/apache/tajo/QueryTestCaseBase.java | 12 +++
 .../org/apache/tajo/TajoTestingCluster.java     | 18 +++-
 .../java/org/apache/tajo/conf/TajoConf.java     |  5 +-
 .../tajo/engine/query/TestGroupByQuery.java     | 49 +++-------
 .../tajo/engine/query/TestTablePartitions.java  | 51 ++++++----
 .../org/apache/tajo/master/TestQueryResult.java | 14 +--
 .../apache/tajo/querymaster/TestQueryState.java | 22 +++--
 .../tajo/querymaster/TestTaskStatusUpdate.java  | 68 +++++---------
 .../org/apache/tajo/master/QueryManager.java    |  9 +-
 .../apache/tajo/querymaster/QueryMaster.java    | 97 ++++++--------------
 .../java/org/apache/tajo/querymaster/Stage.java | 11 ++-
 .../apache/tajo/util/history/HistoryWriter.java | 11 ++-
 .../tajo/worker/TajoWorkerClientService.java    | 10 +-
 14 files changed, 161 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b47f20d..350ba1e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -34,6 +34,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1810: Remove QueryMasterTask cache immediately, if it stored to 
+    persistent storage. (jinho)
+
     TAJO-993: Cleanup the result data in HDFS after query finished.
     (jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 0f277f0..bfafd6d 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -41,6 +41,8 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.UndefinedTableException;
+import org.apache.tajo.jdbc.FetchResultSet;
+import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.master.GlobalEngine;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
@@ -1127,4 +1129,14 @@ public class QueryTestCaseBase {
     }
     return result;
   }
+
+  public static QueryId getQueryId(ResultSet resultSet) {
+    if (resultSet instanceof TajoMemoryResultSet) {
+      return ((TajoMemoryResultSet) resultSet).getQueryId();
+    } else if (resultSet instanceof FetchResultSet) {
+      return ((FetchResultSet) resultSet).getQueryId();
+    } else {
+      throw new IllegalArgumentException(resultSet.toString());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
index ab1c156..bb690dd 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -31,10 +31,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.apache.tajo.annotation.NotNull;
-import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.store.*;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.client.TajoClientUtil;
@@ -53,8 +50,8 @@ import org.apache.tajo.storage.FileTablespace;
 import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.history.QueryHistory;
 import org.apache.tajo.worker.TajoWorker;
 
 import java.io.File;
@@ -156,7 +153,7 @@ public class TajoTestingCluster {
     conf.setIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2);
 
     // Memory cache termination
-    conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1);
+    conf.setIntVar(ConfVars.HISTORY_QUERY_CACHE_SIZE, 10);
 
     // Python function path
     conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString());
@@ -772,4 +769,15 @@ public class TajoTestingCluster {
     }
     return qmt;
   }
+
+  public QueryHistory getQueryHistory(QueryId queryId) throws IOException {
+    QueryHistory queryHistory = null;
+    for (TajoWorker worker : getTajoWorkers()) {
+      queryHistory = worker.getWorkerContext().getQueryMaster().getQueryHistory(queryId);
+      if (queryHistory != null) {
+        break;
+      }
+    }
+    return queryHistory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index b50ce81..2a30995 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -184,10 +184,6 @@ public class TajoConf extends Configuration {
 
     WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false, Validators.bool()),
 
-    // Tajo History
-    WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 60), // 1
hours
-    QUERYMASTER_CACHE_EXPIRE_PERIOD("tajo.qm.history.expire-interval-minutes", 10), // 10
mins
-
     WORKER_HEARTBEAT_QUEUE_THRESHOLD_RATE("tajo.worker.heartbeat.queue.threshold-rate", 0.3f,
Validators.min("0")),//30%
     WORKER_HEARTBEAT_IDLE_INTERVAL("tajo.worker.heartbeat.idle.interval", 10 * 1000),  //
10 sec
     WORKER_HEARTBEAT_ACTIVE_INTERVAL("tajo.worker.heartbeat.active.interval", 1000),  //
1 sec
@@ -276,6 +272,7 @@ public class TajoConf extends Configuration {
     HISTORY_QUERY_DIR("tajo.history.query.dir", STAGING_ROOT_DIR.defaultVal + "/history"),
     HISTORY_TASK_DIR("tajo.history.task.dir", "file:///tmp/tajo-${user.name}/history"),
     HISTORY_EXPIRY_TIME_DAY("tajo.history.expiry-time-day", 7),
+    HISTORY_QUERY_CACHE_SIZE("tajo.history.cache.size", 100, Validators.min("0")),
 
     // Misc -------------------------------------------------------------------
     // Fragment

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index a5caf38..0fe5e0e 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -24,14 +24,11 @@ import org.apache.tajo.*;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.querymaster.Query;
-import org.apache.tajo.querymaster.QueryMasterTask;
-import org.apache.tajo.querymaster.Stage;
-import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.TajoWorker;
+import org.apache.tajo.util.history.QueryHistory;
+import org.apache.tajo.util.history.StageHistory;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -42,7 +39,7 @@ import org.junit.runners.Parameterized.Parameters;
 import java.sql.ResultSet;
 import java.util.*;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 @Category(IntegrationTest.class)
 @RunWith(Parameterized.class)
@@ -693,8 +690,6 @@ public class TestGroupByQuery extends QueryTestCaseBase {
 
   @Test
   public final void testNumShufflePartition() throws Exception {
-
-    Thread.sleep(5000);
     KeyValueSet tableOptions = new KeyValueSet();
     tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
@@ -740,40 +735,18 @@ public class TestGroupByQuery extends QueryTestCaseBase {
       }
       assertEquals(uniqKeys.size(), numRows);
 
-      // find last QueryMasterTask
-      List<QueryMasterTask> qmTasks = new ArrayList<QueryMasterTask>();
-
-      for(TajoWorker worker: testingCluster.getTajoWorkers()) {
-        qmTasks.addAll(worker.getWorkerContext().getQueryMaster().getFinishedQueryMasterTasks());
-      }
+      QueryId queryId = getQueryId(res);
+      QueryHistory queryHistory = testingCluster.getQueryHistory(queryId);
 
-      assertTrue(!qmTasks.isEmpty());
-
-      Collections.sort(qmTasks, new Comparator<QueryMasterTask>() {
-        @Override
-        public int compare(QueryMasterTask o1, QueryMasterTask o2) {
-          long l1 = o1.getQuerySubmitTime();
-          long l2 = o2.getQuerySubmitTime();
-          return l1 < l2 ? - 1 : (l1 > l2 ? 1 : 0);
-        }
-      });
-
-      // Getting the number of partitions. It should be 2.
-      Set<Integer> partitionIds = new HashSet<Integer>();
-
-      Query query = qmTasks.get(qmTasks.size() - 1).getQuery();
-      Collection<Stage> stages = query.getStages();
-      assertNotNull(stages);
-      assertTrue(!stages.isEmpty());
-      for (Stage stage : stages) {
-        if (stage.getId().toStringNoPrefix().endsWith("_000001")) {
-          for (Task.IntermediateEntry eachInterm: stage.getHashShuffleIntermediateEntries())
{
-            partitionIds.add(eachInterm.getPartId());
-          }
+      int shuffles = 0;
+      for (StageHistory stage : queryHistory.getStageHistories()) {
+        if (stage.getExecutionBlockId().endsWith("_000001")) {
+          // Getting the number of partitions. It should be 2.
+          shuffles = stage.getNumShuffles();
         }
       }
 
-      assertEquals(2, partitionIds.size());
+      assertEquals(2, shuffles);
       executeString("DROP TABLE testnumshufflepartition PURGE").close();
     } finally {
       testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_GROUPBY_PARTITION_VOLUME.varname,

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 952e26a..52e7b54 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -28,6 +28,7 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
 import org.apache.tajo.common.TajoDataTypes;
@@ -77,9 +78,9 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
   @Test
   public final void testCreateColumnPartitionedTable() throws Exception {
-    ResultSet res = null;
+    ResultSet res;
     String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTable");
-
+    ClientProtos.SubmitQueryResponse response;
     if (nodeType == NodeType.INSERT) {
       res = executeString(
         "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8)
");
@@ -89,16 +90,22 @@ public class TestTablePartitions extends QueryTestCaseBase {
       assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
       assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
 
-      res = testBase.execute(
-        "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
-          "l_quantity from lineitem");
+      response = client.executeQuery(
+          "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
+              "l_quantity from lineitem");
     } else {
-      res = testBase.execute(
-        "create table " + tableName + "(col1 int4, col2 int4) partition by column(key float8)
"
-          + " as select l_orderkey, l_partkey, l_quantity from lineitem");
+      response = client.executeQuery(
+          "create table " + tableName + "(col1 int4, col2 int4) partition by column(key float8)
"
+              + " as select l_orderkey, l_partkey, l_quantity from lineitem");
     }
 
-    MasterPlan plan = getQueryPlan(res);
+    QueryId queryId = new QueryId(response.getQueryId());
+    testingCluster.waitForQuerySubmitted(queryId, 10);
+    QueryMasterTask queryMasterTask = testingCluster.getQueryMasterTask(queryId);
+    assertNotNull(queryMasterTask);
+    TajoClientUtil.waitCompletion(client, queryId);
+
+    MasterPlan plan = queryMasterTask.getQuery().getPlan();
     ExecutionBlock rootEB = plan.getRoot();
 
     assertEquals(1, plan.getChildCount(rootEB.getId()));
@@ -120,15 +127,15 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
     TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
     verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
-      tableDesc.getStats().getNumRows());
+        tableDesc.getStats().getNumRows());
 
     executeString("DROP TABLE " + tableName + " PURGE").close();
-    res.close();
   }
 
   @Test
   public final void testCreateColumnPartitionedTableWithJoin() throws Exception {
-    ResultSet res = null;
+    ResultSet res;
+    ClientProtos.SubmitQueryResponse response;
     String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithJoin");
 
     if (nodeType == NodeType.INSERT) {
@@ -140,18 +147,23 @@ public class TestTablePartitions extends QueryTestCaseBase {
       assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
       assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
 
-      res = testBase.execute(
-        "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
-          "l_quantity from lineitem join orders on l_orderkey = o_orderkey");
+      response = client.executeQuery(
+          "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
+              "l_quantity from lineitem join orders on l_orderkey = o_orderkey");
 
     } else {
-      res = testBase.execute("create table " + tableName + " (col1 int4, col2 int4) partition
by column(key float8) "
-        + " AS select l_orderkey, l_partkey, l_quantity from lineitem join orders on l_orderkey
= o_orderkey");
+      response = client.executeQuery("create table " + tableName + " (col1 int4, col2 int4)
partition by column(key float8) "
+          + " AS select l_orderkey, l_partkey, l_quantity from lineitem join orders on l_orderkey
= o_orderkey");
     }
 
-    MasterPlan plan = getQueryPlan(res);
-    ExecutionBlock rootEB = plan.getRoot();
+    QueryId queryId = new QueryId(response.getQueryId());
+    testingCluster.waitForQuerySubmitted(queryId, 10);
+    QueryMasterTask queryMasterTask = testingCluster.getQueryMasterTask(queryId);
+    assertNotNull(queryMasterTask);
+    TajoClientUtil.waitCompletion(client, queryId);
 
+    MasterPlan plan = queryMasterTask.getQuery().getPlan();
+    ExecutionBlock rootEB = plan.getRoot();
     assertEquals(1, plan.getChildCount(rootEB.getId()));
 
     ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0);
@@ -173,7 +185,6 @@ public class TestTablePartitions extends QueryTestCaseBase {
       tableDesc.getStats().getNumRows());
 
     executeString("DROP TABLE " + tableName + " PURGE").close();
-    res.close();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java
index 6775c84..ef25e86 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java
@@ -18,13 +18,11 @@
 
 package org.apache.tajo.master;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.jdbc.FetchResultSet;
-import org.apache.tajo.jdbc.TajoMemoryResultSet;
-import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 
 import java.sql.ResultSet;
@@ -61,14 +59,4 @@ public class TestQueryResult extends QueryTestCaseBase {
     assertResultSet(res);
     cleanupQuery(res);
   }
-
-  private QueryId getQueryId(ResultSet resultSet) {
-    if (resultSet instanceof TajoMemoryResultSet) {
-      return ((TajoMemoryResultSet) resultSet).getQueryId();
-    } else if (resultSet instanceof FetchResultSet) {
-      return ((FetchResultSet) resultSet).getQueryId();
-    } else {
-      throw new IllegalArgumentException(resultSet.toString());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
index 978d709..a43491b 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
@@ -24,11 +24,16 @@ import org.apache.tajo.client.QueryStatus;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.util.history.QueryHistory;
+import org.apache.tajo.util.history.StageHistory;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.List;
+
 import static org.junit.Assert.*;
 
 @Category(IntegrationTest.class)
@@ -80,17 +85,16 @@ public class TestQueryState {
       queryState = client.getQueryStatus(queryId);
     }
 
-    QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
-    Query query = qmt.getQuery();
+    QueryInfo queryInfo = cluster.getMaster().getContext().getQueryJobManager().getFinishedQuery(queryId);
+    assertEquals(queryId, queryInfo.getQueryId());
+    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, queryInfo.getQueryState());
 
-    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, qmt.getState());
-    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getSynchronizedState());
-    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getState());
+    QueryHistory history = cluster.getQueryHistory(queryId);
+    List<StageHistory> stages = history.getStageHistories();
 
-    assertFalse(query.getStages().isEmpty());
-    for (Stage stage : query.getStages()) {
-      assertEquals(StageState.SUCCEEDED, stage.getSynchronizedState());
-      assertEquals(StageState.SUCCEEDED, stage.getState());
+    assertFalse(stages.isEmpty());
+    for (StageHistory stage : stages) {
+      assertEquals(StageState.SUCCEEDED.toString(), stage.getState());
     }
 
     /* get status from TajoMaster */

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
index b468e37..4654d38 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java
@@ -19,18 +19,21 @@
 package org.apache.tajo.querymaster;
 
 import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.worker.TajoWorker;
+import org.apache.tajo.util.history.QueryHistory;
+import org.apache.tajo.util.history.StageHistory;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.sql.ResultSet;
-import java.util.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.junit.Assert.*;
@@ -58,8 +61,9 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase {
       long[] expectedNumRows = new long[]{5, 2, 2, 2};
       long[] expectedNumBytes = new long[]{604, 18, 18, 8};
       long[] expectedReadBytes = new long[]{604, 604, 18, 0};
+      QueryId queryId = getQueryId(res);
 
-      assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes);
+      assertStatus(queryId, 2, expectedNumRows, expectedNumBytes, expectedReadBytes);
     } finally {
       cleanupQuery(res);
     }
@@ -77,7 +81,8 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase {
       long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 194};
       long[] expectedReadBytes = new long[]{604, 604, 162, 0, 138, 0};
 
-      assertStatus(3, expectedNumRows, expectedNumBytes, expectedReadBytes);
+      QueryId queryId = getQueryId(res);
+      assertStatus(queryId, 3, expectedNumRows, expectedNumBytes, expectedReadBytes);
     } finally {
       cleanupQuery(res);
     }
@@ -105,7 +110,8 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase {
       long[] expectedNumBytes = new long[]{20, 75, 8, 34, 109, 34, 34, 18};
       long[] expectedReadBytes = new long[]{20, 20, 8, 8, 109, 0, 34, 0};
 
-      assertStatus(4, expectedNumRows, expectedNumBytes, expectedReadBytes);
+      QueryId queryId = getQueryId(res);
+      assertStatus(queryId, 4, expectedNumRows, expectedNumBytes, expectedReadBytes);
     } finally {
       cleanupQuery(res);
     }
@@ -128,61 +134,37 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase {
     res.close();
   }
 
-  private void assertStatus(int numStages,
+  private void assertStatus(QueryId queryId, int numStages,
                             long[] expectedNumRows,
                             long[] expectedNumBytes,
                             long[] expectedReadBytes) throws Exception {
-      List<TajoWorker> tajoWorkers = testingCluster.getTajoWorkers();
-      Collection<QueryMasterTask> finishedTasks = null;
-      for (TajoWorker eachWorker: tajoWorkers) {
-        finishedTasks = eachWorker.getWorkerContext().getQueryMaster().getFinishedQueryMasterTasks();
-        if (finishedTasks != null && !finishedTasks.isEmpty()) {
-          break;
-        }
-      }
-
-      assertNotNull(finishedTasks);
-      assertTrue(!finishedTasks.isEmpty());
-
-      List<QueryMasterTask> finishedTaskList = new ArrayList<QueryMasterTask>(finishedTasks);
 
-      Collections.sort(finishedTaskList, new Comparator<QueryMasterTask>() {
-        @Override
-        public int compare(QueryMasterTask o1, QueryMasterTask o2) {
-          return o2.getQueryId().compareTo(o1.getQueryId());
-        }
-      });
 
-      Query query = finishedTaskList.get(0).getQuery();
+      QueryHistory queryHistory  = testingCluster.getQueryHistory(queryId);
 
-      assertNotNull(query);
+      assertNotNull(queryHistory);
 
-      List<Stage> stages = new ArrayList<Stage>(query.getStages());
+      List<StageHistory> stages = queryHistory.getStageHistories();
       assertEquals(numStages, stages.size());
 
-      Collections.sort(stages, new Comparator<Stage>() {
+      Collections.sort(stages, new Comparator<StageHistory>() {
         @Override
-        public int compare(Stage o1, Stage o2) {
-          return o1.getId().compareTo(o2.getId());
+        public int compare(StageHistory o1, StageHistory o2) {
+          return o1.getExecutionBlockId().compareTo(o2.getExecutionBlockId());
         }
       });
 
       int index = 0;
-      for (Stage eachStage : stages) {
-        TableStats inputStats = eachStage.getInputStats();
-        TableStats resultStats = eachStage.getResultStats();
+      for (StageHistory eachStage : stages) {
 
-        assertNotNull(inputStats);
-        assertEquals(expectedNumRows[index], inputStats.getNumRows().longValue());
-        assertEquals(expectedNumBytes[index], inputStats.getNumBytes().longValue());
-        assertEquals(expectedReadBytes[index], inputStats.getReadBytes().longValue());
+        assertEquals(expectedNumRows[index], eachStage.getTotalReadRows());
+        assertEquals(expectedNumBytes[index], eachStage.getTotalInputBytes());
+        assertEquals(expectedReadBytes[index], eachStage.getTotalReadBytes());
 
         index++;
 
-        assertNotNull(resultStats);
-        assertEquals(expectedNumRows[index], resultStats.getNumRows().longValue());
-        assertEquals(expectedNumBytes[index], resultStats.getNumBytes().longValue());
-        assertEquals(expectedReadBytes[index], resultStats.getReadBytes().longValue());
+        assertEquals(expectedNumRows[index], eachStage.getTotalWriteRows());
+        assertEquals(expectedNumBytes[index],eachStage.getTotalWriteBytes());
 
         index++;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index 95562ed..ba421bd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -35,13 +35,14 @@ import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest;
 import org.apache.tajo.ResourceProtos.TajoHeartbeatResponse;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.scheduler.QuerySchedulingInfo;
 import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.querymaster.QueryJobEvent;
 import org.apache.tajo.session.Session;
-import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.util.*;
@@ -61,9 +62,9 @@ public class QueryManager extends CompositeService {
   private AsyncDispatcher dispatcher;
 
   private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
-
   private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
-  private final LRUMap historyCache = new LRUMap(HistoryReader.DEFAULT_PAGE_SIZE);
+
+  private LRUMap historyCache;
 
   private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
   private AtomicLong maxExecutionTime = new AtomicLong();
@@ -83,6 +84,8 @@ public class QueryManager extends CompositeService {
 
       this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
 
+      TajoConf tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
+      this.historyCache = new LRUMap(tajoConf.getIntVar(TajoConf.ConfVars.HISTORY_QUERY_CACHE_SIZE));
     } catch (Exception e) {
       LOG.error("Failed to init service " + getName() + " by exception " + e, e);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index a029802..cce9482 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -30,29 +30,28 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tajo.QueryId;
-
+import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest;
+import org.apache.tajo.ResourceProtos.TajoHeartbeatResponse;
+import org.apache.tajo.ResourceProtos.WorkerConnectionsResponse;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.error.Errors.ResultCode;
 import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService;
-import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest;
-import org.apache.tajo.ResourceProtos.TajoHeartbeatResponse;
-import org.apache.tajo.ResourceProtos.WorkerConnectionsResponse;
 import org.apache.tajo.master.event.QueryStartEvent;
 import org.apache.tajo.master.event.QueryStopEvent;
 import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.TUtil;
-import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter.WriterFuture;
+import org.apache.tajo.util.history.HistoryWriter.WriterHolder;
 import org.apache.tajo.util.history.QueryHistory;
 import org.apache.tajo.worker.TajoWorker;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -76,8 +75,7 @@ public class QueryMaster extends CompositeService implements EventHandler
{
 
   private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap();
 
-  private final LRUMap
-      finishedQueryMasterTasksCache = new LRUMap(HistoryReader.DEFAULT_PAGE_SIZE);
+  private LRUMap finishedQueryMasterTasksCache;
 
   private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
 
@@ -89,8 +87,6 @@ public class QueryMaster extends CompositeService implements EventHandler
{
 
   private QueryHeartbeatThread queryHeartbeatThread;
 
-  private FinishedQueryMasterTaskCleanThread finishedQueryMasterTaskCleanThread;
-
   private TajoWorker.WorkerContext workerContext;
 
   private RpcClientManager manager;
@@ -114,6 +110,7 @@ public class QueryMaster extends CompositeService implements EventHandler
{
     queryMasterContext = new QueryMasterContext(systemConf);
 
     clock = new SystemClock();
+    finishedQueryMasterTasksCache = new LRUMap(systemConf.getIntVar(TajoConf.ConfVars.HISTORY_QUERY_CACHE_SIZE));
 
     this.dispatcher = new AsyncDispatcher();
     addIfService(dispatcher);
@@ -134,9 +131,6 @@ public class QueryMaster extends CompositeService implements EventHandler
{
     clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
     clientSessionTimeoutCheckThread.start();
 
-    finishedQueryMasterTaskCleanThread = new FinishedQueryMasterTaskCleanThread();
-    finishedQueryMasterTaskCleanThread.start();
-
     eventExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
     singleEventExecutor = Executors.newSingleThreadExecutor();
     super.serviceStart();
@@ -155,10 +149,6 @@ public class QueryMaster extends CompositeService implements EventHandler
{
       clientSessionTimeoutCheckThread.interrupt();
     }
 
-    if(finishedQueryMasterTaskCleanThread != null) {
-      finishedQueryMasterTaskCleanThread.interrupt();
-    }
-
     if(eventExecutor != null){
       eventExecutor.shutdown();
     }
@@ -210,7 +200,6 @@ public class QueryMaster extends CompositeService implements EventHandler
{
     return queryMasterTasks.get(queryId);
   }
 
-  @Deprecated
   public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean includeFinished) {
     QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId);
     if (queryMasterTask != null) {
@@ -234,10 +223,12 @@ public class QueryMaster extends CompositeService implements EventHandler
{
     return queryMasterTasks.values();
   }
 
-  @Deprecated
-  public Collection<QueryMasterTask> getFinishedQueryMasterTasks() {
-    synchronized (finishedQueryMasterTasksCache) {
-      return new ArrayList<QueryMasterTask>(finishedQueryMasterTasksCache.values());
+  public QueryHistory getQueryHistory(QueryId queryId) throws IOException {
+    QueryMasterTask queryMasterTask = getQueryMasterTask(queryId, true);
+    if(queryMasterTask != null) {
+      return queryMasterTask.getQuery().getQueryHistory();
+    } else {
+      return workerContext.getHistoryReader().getQueryHistory(queryId.toString());
     }
   }
 
@@ -284,7 +275,7 @@ public class QueryMaster extends CompositeService implements EventHandler
{
       return dispatcher.getEventHandler();
     }
 
-    public void stopQuery(QueryId queryId) {
+    public void stopQuery(final QueryId queryId) {
       QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId);
       if(queryMasterTask == null) {
         LOG.warn("No query info:" + queryId);
@@ -325,8 +316,19 @@ public class QueryMaster extends CompositeService implements EventHandler
{
         QueryHistory queryHisory = query.getQueryHistory();
         if (queryHisory != null) {
           try {
+            WriterFuture<WriterHolder> writerFuture = new WriterFuture<WriterHolder>(queryHisory)
{
+              @Override
+              public void done(WriterHolder writerHolder) {
+                super.done(writerHolder);
+
+                //remove memory cache, if history file writer is done
+                synchronized (finishedQueryMasterTasksCache) {
+                  finishedQueryMasterTasksCache.remove(queryId);
+                }
+              }
+            };
             query.context.getQueryMasterContext().getWorkerContext().
-                getTaskHistoryWriter().appendAndFlush(queryHisory);
+                getTaskHistoryWriter().appendHistory(writerFuture);
           } catch (Throwable e) {
             LOG.warn(e, e);
           }
@@ -455,49 +457,4 @@ public class QueryMaster extends CompositeService implements EventHandler
{
       }
     }
   }
-
-  class FinishedQueryMasterTaskCleanThread extends Thread {
-    public void run() {
-      int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_CACHE_EXPIRE_PERIOD);
-      LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " +
expireIntervalTime);
-      while(!isStopped) {
-        try {
-          synchronized (this) {
-            this.wait(60 * 1000);  // minimum interval minutes
-          }
-        } catch (InterruptedException e) {
-          break;
-        }
-        try {
-          long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000l;
-          cleanExpiredFinishedQueryMasterTask(expireTime);
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        }
-      }
-    }
-
-    private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
-      List<Object> finishedList;
-      synchronized (finishedQueryMasterTasksCache) {
-        finishedList = new ArrayList<Object>(finishedQueryMasterTasksCache.values());
-      }
-
-      for(Object finishedTask: finishedList) {
-        QueryMasterTask queryMasterTask = (QueryMasterTask) finishedTask;
-          /* If a query are abnormal termination, the finished time will be zero. */
-        long finishedTime = queryMasterTask.getStartTime();
-        Query query = queryMasterTask.getQuery();
-        if (query != null && query.getFinishTime() > 0) {
-          finishedTime = query.getFinishTime();
-        }
-
-        if(finishedTime < expireTime) {
-          synchronized (finishedQueryMasterTasksCache) {
-            finishedQueryMasterTasksCache.remove(queryMasterTask.getQueryId());
-          }
-        }
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 12e4366..2ded786 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -21,6 +21,7 @@ package org.apache.tajo.querymaster;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.event.Event;
@@ -458,9 +459,8 @@ public class Stage implements EventHandler<StageEvent> {
     long totalReadRows = 0;
     long totalWriteBytes = 0;
     long totalWriteRows = 0;
-    int numShuffles = 0;
+
     for(Task eachTask : getTasks()) {
-      numShuffles = eachTask.getShuffleOutpuNum();
       if (eachTask.getLastAttempt() != null) {
         TableStats inputStats = eachTask.getLastAttempt().getInputStats();
         if (inputStats != null) {
@@ -476,12 +476,17 @@ public class Stage implements EventHandler<StageEvent> {
       }
     }
 
+    Set<Integer> partitions = Sets.newHashSet();
+    for (IntermediateEntry entry : getHashShuffleIntermediateEntries()) {
+       partitions.add(entry.getPartId());
+    }
+
     stageHistory.setTotalInputBytes(totalInputBytes);
     stageHistory.setTotalReadBytes(totalReadBytes);
     stageHistory.setTotalReadRows(totalReadRows);
     stageHistory.setTotalWriteBytes(totalWriteBytes);
     stageHistory.setTotalWriteRows(totalWriteRows);
-    stageHistory.setNumShuffles(numShuffles);
+    stageHistory.setNumShuffles(partitions.size());
     stageHistory.setProgress(getProgress());
     return stageHistory;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index c51b4e6..5fca7a7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -136,6 +136,13 @@ public class HistoryWriter extends AbstractService {
     return future;
   }
 
+  public void appendHistory(WriterFuture<WriterHolder> future) {
+    historyQueue.add(future);
+    synchronized (writerThread) {
+      writerThread.notifyAll();
+    }
+  }
+
   /* asynchronously flush to history file */
   public WriterFuture<WriterHolder> appendAndFlush(History history) {
     WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(history)
{
@@ -524,7 +531,7 @@ public class HistoryWriter extends AbstractService {
     return new Path(fileParent, processName + "_" + hour + "_" + maxSeq + HISTORY_FILE_POSTFIX);
   }
 
-  static class WriterHolder implements Closeable {
+  public static class WriterHolder implements Closeable {
     long lastWritingTime;
     Path path;
     FSDataOutputStream out;
@@ -542,7 +549,7 @@ public class HistoryWriter extends AbstractService {
     }
   }
 
-  static class WriterFuture<T> implements Future<T> {
+  public static class WriterFuture<T> implements Future<T> {
     private boolean done = false;
     private T result;
     private History history;

http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 281e23e..26b0d08 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -31,7 +31,6 @@ import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
 import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.rpc.BlockingRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.NetUtils;
@@ -117,14 +116,7 @@ public class TajoWorkerClientService extends AbstractService {
 
       try {
         QueryId queryId = new QueryId(request.getQueryId());
-
-        QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId,
true);
-        QueryHistory queryHistory = null;
-        if (queryMasterTask == null) {
-          queryHistory = workerContext.getHistoryReader().getQueryHistory(queryId.toString());
-        } else {
-          queryHistory = queryMasterTask.getQuery().getQueryHistory();
-        }
+        QueryHistory queryHistory = workerContext.getQueryMaster().getQueryHistory(queryId);
 
         if (queryHistory != null) {
           builder.setQueryHistory(queryHistory.getProto());


Mime
View raw message