tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [3/3] tajo git commit: TAJO-1026: Implement Query history persistency manager.
Date Tue, 11 Nov 2014 01:32:12 GMT
TAJO-1026: Implement Query history persistency manager.

Closes #179


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e01b00a7
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e01b00a7
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e01b00a7

Branch: refs/heads/master
Commit: e01b00a7b05b98cc407ced04dad1c2f2144d0f88
Parents: a3e5bdd
Author: HyoungJun Kim <babokim@babokim-mbp.server.gruter.com>
Authored: Tue Nov 11 10:30:22 2014 +0900
Committer: HyoungJun Kim <babokim@babokim-mbp.server.gruter.com>
Committed: Tue Nov 11 10:30:22 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   4 +-
 .../org/apache/tajo/client/QueryClient.java     |   6 +
 .../org/apache/tajo/client/QueryClientImpl.java |  52 +++
 .../org/apache/tajo/client/TajoClientImpl.java  |  13 +-
 tajo-client/src/main/proto/ClientProtos.proto   |  60 +++
 .../main/proto/QueryMasterClientProtocol.proto  |   1 +
 .../main/proto/TajoMasterClientProtocol.proto   |   1 +
 .../java/org/apache/tajo/conf/TajoConf.java     |  26 ++
 .../java/org/apache/tajo/master/TajoMaster.java |  21 +
 .../tajo/master/TajoMasterClientService.java    |  33 ++
 .../apache/tajo/master/querymaster/Query.java   |  39 ++
 .../master/querymaster/QueryInProgress.java     |   3 +
 .../tajo/master/querymaster/QueryInfo.java      |  79 +++-
 .../tajo/master/querymaster/QueryMaster.java    |  89 ++--
 .../tajo/master/querymaster/QueryUnit.java      |  79 ++++
 .../tajo/master/querymaster/SubQuery.java       |  76 ++++
 .../main/java/org/apache/tajo/util/JSPUtil.java | 162 ++++++-
 .../org/apache/tajo/util/history/History.java   |  27 ++
 .../tajo/util/history/HistoryCleaner.java       | 136 ++++++
 .../apache/tajo/util/history/HistoryReader.java | 308 +++++++++++++
 .../apache/tajo/util/history/HistoryWriter.java | 450 +++++++++++++++++++
 .../apache/tajo/util/history/QueryHistory.java  | 151 +++++++
 .../tajo/util/history/QueryUnitHistory.java     | 167 +++++++
 .../tajo/util/history/SubQueryHistory.java      | 270 +++++++++++
 .../java/org/apache/tajo/worker/TajoWorker.java |  20 +
 .../tajo/worker/TajoWorkerClientService.java    |  34 ++
 .../main/java/org/apache/tajo/worker/Task.java  |   5 +-
 .../org/apache/tajo/worker/TaskHistory.java     |   8 +-
 .../src/main/resources/webapps/admin/query.jsp  |  72 +--
 .../resources/webapps/admin/querydetail.jsp     | 116 +++++
 .../main/resources/webapps/admin/querytasks.jsp | 249 ++++++++++
 .../main/resources/webapps/admin/queryunit.jsp  | 134 ++++++
 .../resources/webapps/worker/querydetail.jsp    |  43 +-
 .../resources/webapps/worker/querytasks.jsp     | 126 ++++--
 .../resources/webapps/worker/taskhistory.jsp    | 123 +++++
 .../org/apache/tajo/client/TestTajoClient.java  |  54 +++
 .../java/org/apache/tajo/util/TestJSPUtil.java  |  37 +-
 .../util/history/TestHistoryWriterReader.java   | 251 +++++++++++
 38 files changed, 3384 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 665313b..6c53920 100644
--- a/CHANGES
+++ b/CHANGES
@@ -5,12 +5,14 @@ Release 0.9.1 - unreleased
 
   NEW FEATURES
 
+    TAJO-1026: Implement Query history persistency manager.(Hyoungjun Kim)
+
     TAJO-233: Support PostgreSQL CatalogStore. (Jihun Kang via hyunsik)
 
     TAJO-235: Support Oracle CatalogStore. (Jihun Kang via hyunsik)
 
   IMPROVEMENT
-
+   
     TAJO-1133: Add 'bin/tajo version' command. (Jihun Kang via hyunsik)
 
     TAJO-1145: Add 'bin/tajo --help' command. (Jihun Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
index 59ef52b..9b24663 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
@@ -22,6 +22,8 @@ import com.google.protobuf.ServiceException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto;
+import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
 
 import java.io.Closeable;
@@ -111,4 +113,8 @@ public interface QueryClient extends Closeable {
   public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException;
 
   public QueryStatus killQuery(final QueryId queryId) throws ServiceException, IOException;
+
+  public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException;
+
+  public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index f92c9bf..5b78959 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -618,4 +618,56 @@ public class QueryClientImpl implements QueryClient {
     }
     return status;
   }
+
+  public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException {
+    return new ServerCallable<QueryInfoProto>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+      public QueryInfoProto call(NettyClientBase client) throws ServiceException {
+        connection.checkSessionAndGet(client);
+
+        QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        builder.setQueryId(queryId.getProto());
+
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
+        if (res.getResultCode() == ResultCode.OK) {
+          return res.getQueryInfo();
+        } else {
+          abort();
+          throw new ServiceException(res.getErrorMessage());
+        }
+      }
+    }.withRetries();
+  }
+
+  public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException {
+    final QueryInfoProto queryInfo = getQueryInfo(queryId);
+
+    if (queryInfo.getHostNameOfQM() == null || queryInfo.getQueryMasterClientPort() == 0) {
+      return null;
+    }
+    InetSocketAddress qmAddress = new InetSocketAddress(
+        queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort());
+
+    return new ServerCallable<QueryHistoryProto>(connection.connPool, qmAddress,
+        QueryMasterClientProtocol.class, false, true) {
+      public QueryHistoryProto call(NettyClientBase client) throws ServiceException {
+        connection.checkSessionAndGet(client);
+
+        QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        builder.setQueryId(queryId.getProto());
+
+        QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
+        GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
+        if (res.getResultCode() == ResultCode.OK) {
+          return res.getQueryHistory();
+        } else {
+          abort();
+          throw new ServiceException(res.getErrorMessage());
+        }
+      }
+    }.withRetries();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index 75de492..1d637ed 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -32,10 +32,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
-import org.apache.tajo.ipc.ClientProtos.GetQueryResultResponse;
-import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
-import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
+import org.apache.tajo.ipc.ClientProtos.*;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.jdbc.TajoResultSet;
 import org.apache.tajo.util.NetUtils;
@@ -158,6 +155,14 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
     return queryClient.getClusterInfo();
   }
 
+  public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException {
+    return queryClient.getQueryInfo(queryId);
+  }
+
+  public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException {
+    return queryClient.getQueryHistory(queryId);
+  }
+
   /*------------------------------------------------------------------------*/
   // CatalogClient wrappers
   /*------------------------------------------------------------------------*/

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index 23ae6dd..4118458 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -234,3 +234,63 @@ message FunctionResponse {
   repeated FunctionDescProto functions = 2;
   optional string errorMessage = 3;
 }
+
+message QueryInfoProto {
+  required string queryId = 1;
+  optional string sql = 2;
+  optional QueryState queryState = 3;
+  optional float progress = 4;
+  optional int64 startTime = 5;
+  optional int64 finishTime = 6;
+  optional string lastMessage = 7;
+  optional string hostNameOfQM = 8;
+  optional int32 queryMasterPort = 9;
+  optional int32 queryMasterClientPort = 10;
+  optional int32 queryMasterInfoPort = 11;
+}
+
+message SubQueryHistoryProto {
+  required string executionBlockId =1;
+  required string state = 2;
+  optional int64 startTime = 3;
+  optional int64 finishTime = 4;
+  optional int32 succeededObjectCount = 5;
+  optional int32 failedObjectCount = 6;
+  optional int32 killedObjectCount = 7;
+  optional int32 totalScheduledObjectsCount = 8;
+
+  optional int64 totalInputBytes = 9;
+  optional int64 totalReadBytes = 10;
+  optional int64 totalReadRows = 11;
+  optional int64 totalWriteBytes = 12;
+  optional int64 totalWriteRows = 13;
+  optional int32 numShuffles = 14;
+  optional float progress =15;
+
+  optional string plan = 16;
+  optional int32 hostLocalAssigned = 17;
+  optional int32 rackLocalAssigned = 18;
+}
+
+message QueryHistoryProto {
+  required string queryId = 1;
+  optional string queryMaster = 2;
+  optional int32 httpPort = 3;
+  optional string logicalPlan = 4;
+  optional string distributedPlan = 5;
+  repeated KeyValueProto sessionVariables = 6;
+  repeated SubQueryHistoryProto subQueryHistories = 7;
+}
+
+message GetQueryHistoryResponse {
+  required ResultCode resultCode = 1;
+  optional QueryHistoryProto queryHistory = 2;
+  optional string errorMessage = 3;
+}
+
+message GetQueryInfoResponse {
+  required ResultCode resultCode = 1;
+  optional QueryInfoProto queryInfo = 2;
+  optional string errorMessage = 3;
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-client/src/main/proto/QueryMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/QueryMasterClientProtocol.proto b/tajo-client/src/main/proto/QueryMasterClientProtocol.proto
index 9d96505..3d8d70b 100644
--- a/tajo-client/src/main/proto/QueryMasterClientProtocol.proto
+++ b/tajo-client/src/main/proto/QueryMasterClientProtocol.proto
@@ -32,4 +32,5 @@ service QueryMasterClientProtocolService {
   rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
   rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
   rpc closeQuery(QueryIdProto) returns (BoolProto);
+  rpc getQueryHistory(QueryIdRequest) returns (GetQueryHistoryResponse);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
index 1afc069..bc59617 100644
--- a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
+++ b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
@@ -51,6 +51,7 @@ service TajoMasterClientProtocolService {
   rpc killQuery(QueryIdRequest) returns (BoolProto);
   rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
   rpc closeNonForwardQuery(QueryIdRequest) returns (BoolProto);
+  rpc getQueryInfo(QueryIdRequest) returns (GetQueryInfoResponse);
 
   // Database Management APIs
   rpc createDatabase(SessionedStringProto) returns (BoolProto);

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 786aed0..3966410 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -296,6 +296,11 @@ public class TajoConf extends Configuration {
     // Metrics ----------------------------------------------------------------
     METRICS_PROPERTY_FILENAME("tajo.metrics.property.file", "tajo-metrics.properties"),
 
+    // Query History  ---------------------------------------------------------
+    HISTORY_QUERY_DIR("tajo.history.query.dir", STAGING_ROOT_DIR.defaultVal + "/history"),
+    HISTORY_TASK_DIR("tajo.history.task.dir", "file:///tmp/tajo-${user.name}/history"),
+    HISTORY_EXPIRY_TIME_DAY("tajo.history.expiry-time-day", 7),
+
     // Misc -------------------------------------------------------------------
 
     // Geo IP
@@ -678,6 +683,27 @@ public class TajoConf extends Configuration {
     return new Path(stagingDirString);
   }
 
+  public static Path getQueryHistoryDir(TajoConf conf) throws IOException {
+    String historyDirString = conf.getVar(ConfVars.HISTORY_QUERY_DIR);
+    if (!hasScheme(historyDirString)) {
+      Path stagingPath = getStagingDir(conf);
+      FileSystem fs = stagingPath.getFileSystem(conf);
+      Path path = new Path(fs.getUri().toString(), historyDirString);
+      conf.setVar(ConfVars.HISTORY_QUERY_DIR, path.toString());
+      return path;
+    }
+    return new Path(historyDirString);
+  }
+
+  public static Path getTaskHistoryDir(TajoConf conf) throws IOException {
+    String historyDirString = conf.getVar(ConfVars.HISTORY_TASK_DIR);
+    if (!hasScheme(historyDirString)) {
+      //Local dir
+      historyDirString = "file://" + historyDirString;
+    }
+    return new Path(historyDirString);
+  }
+
   public static Path getSystemConfPath(TajoConf conf) {
     String systemConfPathStr = conf.getVar(ConfVars.SYSTEM_CONF_PATH);
     if (systemConfPathStr == null || systemConfPathStr.equals("")) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 25e1be5..17658ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -52,6 +52,8 @@ import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.util.*;
 import org.apache.tajo.util.metrics.TajoSystemMetrics;
+import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter;
 import org.apache.tajo.webapp.QueryExecutorServlet;
 import org.apache.tajo.webapp.StaticHttpServer;
 
@@ -125,6 +127,10 @@ public class TajoMaster extends CompositeService {
 
   private JvmPauseMonitor pauseMonitor;
 
+  private HistoryWriter historyWriter;
+
+  private HistoryReader historyReader;
+
   public TajoMaster() throws Exception {
     super(TajoMaster.class.getName());
   }
@@ -309,6 +315,13 @@ public class TajoMaster extends CompositeService {
     } catch (IOException e) {
       LOG.error(e.getMessage(), e);
     }
+
+    historyWriter = new HistoryWriter(getMasterName(), true);
+    historyWriter.init(getConfig());
+    addIfService(historyWriter);
+    historyWriter.start();
+
+    historyReader = new HistoryReader(getMasterName(), context.getConf());
   }
 
   private void writeSystemConf() throws IOException {
@@ -452,6 +465,14 @@ public class TajoMaster extends CompositeService {
     public HAService getHAService() {
       return haService;
     }
+
+    public HistoryWriter getHistoryWriter() {
+      return historyWriter;
+    }
+
+    public HistoryReader getHistoryReader() {
+      return historyReader;
+    }
   }
 
   String getThreadTaskName(long id, String name) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index a4688d9..2c81cd0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -57,6 +57,8 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.history.QueryHistory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -537,6 +539,37 @@ public class TajoMasterClientService extends AbstractService {
       }
     }
 
+    @Override
+    public GetQueryInfoResponse getQueryInfo(RpcController controller, QueryIdRequest request) throws ServiceException {
+      GetQueryInfoResponse.Builder builder = GetQueryInfoResponse.newBuilder();
+
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        QueryId queryId = new QueryId(request.getQueryId());
+
+        QueryJobManager queryJobManager = context.getQueryJobManager();
+        QueryInProgress queryInProgress = queryJobManager.getQueryInProgress(queryId);
+
+        QueryInfo queryInfo = null;
+        if (queryInProgress == null) {
+          queryInfo = context.getHistoryReader().getQueryInfo(queryId.toString());
+        } else {
+          queryInfo = queryInProgress.getQueryInfo();
+        }
+
+        if (queryInfo != null) {
+          builder.setQueryInfo(queryInfo.getProto());
+        }
+        builder.setResultCode(ResultCode.OK);
+      } catch (Throwable t) {
+        LOG.warn(t.getMessage(), t);
+        builder.setResultCode(ResultCode.ERROR);
+        builder.setErrorMessage(org.apache.hadoop.util.StringUtils.stringifyException(t));
+      }
+
+      return builder.build();
+    }
+
     /**
      * It is invoked by TajoContainerProxy.
      */

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index a16d36a..7114d39 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -52,6 +52,8 @@ import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.history.QueryHistory;
+import org.apache.tajo.util.history.SubQueryHistory;
 
 import java.io.IOException;
 import java.text.NumberFormat;
@@ -287,6 +289,42 @@ public class Query implements EventHandler<QueryEvent> {
     finishTime = clock.getTime();
   }
 
+  public QueryHistory getQueryHistory() {
+    QueryHistory queryHistory = makeQueryHistory();
+    queryHistory.setSubQueryHistories(makeSubQueryHistories());
+    return queryHistory;
+  }
+
+  private List<SubQueryHistory> makeSubQueryHistories() {
+    List<SubQueryHistory> subQueryHistories = new ArrayList<SubQueryHistory>();
+    for(SubQuery eachSubQuery: getSubQueries()) {
+      subQueryHistories.add(eachSubQuery.getSubQueryHistory());
+    }
+
+    return subQueryHistories;
+  }
+
+  private QueryHistory makeQueryHistory() {
+    QueryHistory queryHistory = new QueryHistory();
+
+    queryHistory.setQueryId(getId().toString());
+    queryHistory.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getWorkerName());
+    queryHistory.setHttpPort(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getHttpInfoPort());
+    queryHistory.setLogicalPlan(plan.toString());
+    queryHistory.setLogicalPlan(plan.getLogicalPlan().toString());
+    queryHistory.setDistributedPlan(plan.toString());
+
+    List<String[]> sessionVariables = new ArrayList<String[]>();
+    for(Map.Entry<String,String> entry: plan.getContext().getAllKeyValus().entrySet()) {
+      if (SessionVars.exists(entry.getKey()) && SessionVars.isPublic(SessionVars.get(entry.getKey()))) {
+        sessionVariables.add(new String[]{entry.getKey(), entry.getValue()});
+      }
+    }
+    queryHistory.setSessionVariables(sessionVariables);
+
+    return queryHistory;
+  }
+
   public List<String> getDiagnostics() {
     readLock.lock();
     try {
@@ -385,6 +423,7 @@ public class Query implements EventHandler<QueryEvent> {
       }
       query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
       query.setFinishTime();
+
       return finalState;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 536778a..d949ca4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -146,6 +146,8 @@ public class QueryInProgress extends CompositeService {
     if(queryMasterRpc != null) {
       RpcConnectionPool.getPool((TajoConf)getConfig()).closeConnection(queryMasterRpc);
     }
+
+    masterContext.getHistoryWriter().appendHistory(queryInfo);
     super.stop();
   }
 
@@ -175,6 +177,7 @@ public class QueryInProgress extends CompositeService {
       queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
       queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
       queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
+      queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
 
       getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index 955c5b3..00b95ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -19,21 +19,41 @@
 package org.apache.tajo.master.querymaster;
 
 
+import com.google.gson.annotations.Expose;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.util.history.History;
 
-public class QueryInfo {
+public class QueryInfo implements GsonObject, History {
   private QueryId queryId;
+  @Expose
   private String sql;
-  private String jsonExpr;
+  @Expose
   private TajoProtos.QueryState queryState;
+  @Expose
   private float progress;
+  @Expose
   private long startTime;
+  @Expose
   private long finishTime;
+  @Expose
   private String lastMessage;
+  @Expose
   private String hostNameOfQM;
+  @Expose
   private int queryMasterPort;
+  @Expose
   private int queryMasterClientPort;
+  @Expose
+  private int queryMasterInfoPort;
+  @Expose
+  private String queryIdStr;
+
+  private String jsonExpr;
 
   public QueryInfo(QueryId queryId) {
     this(queryId, null, null);
@@ -41,6 +61,7 @@ public class QueryInfo {
 
   public QueryInfo(QueryId queryId, String sql, String jsonExpr) {
     this.queryId = queryId;
+    this.queryIdStr = queryId.toString();
     this.sql = sql;
     this.jsonExpr = jsonExpr;
     this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT;
@@ -60,7 +81,14 @@ public class QueryInfo {
 
   public void setQueryMaster(String hostName) {
     this.hostNameOfQM = hostName;
+  }
+
+  public int getQueryMasterInfoPort() {
+    return queryMasterInfoPort;
+  }
 
+  public void setQueryMasterInfoPort(int queryMasterInfoPort) {
+    this.queryMasterInfoPort = queryMasterInfoPort;
   }
 
   public void setQueryMasterPort(int port) {
@@ -128,4 +156,51 @@ public class QueryInfo {
   public String getJsonExpr() {
     return jsonExpr;
   }
+
+  @Override
+  public String toJson() {
+    return CoreGsonHelper.toJson(this, QueryInfo.class);
+  }
+
+  @Override
+  public HistoryType getHistoryType() {
+    return HistoryType.QUERY_SUMMARY;
+  }
+
+  public static QueryInfo fromJson(String json) {
+    QueryInfo queryInfo = CoreGsonHelper.fromJson(json, QueryInfo.class);
+    queryInfo.queryId = TajoIdUtils.parseQueryId(queryInfo.queryIdStr);
+    return queryInfo;
+  }
+
+  public String getQueryIdStr() {
+    return queryIdStr;
+  }
+
+  public QueryInfoProto getProto() {
+    QueryInfoProto.Builder builder = QueryInfoProto.newBuilder();
+
+    builder.setQueryId(queryId.toString())
+        .setQueryState(queryState)
+        .setProgress(progress)
+        .setStartTime(startTime)
+        .setFinishTime(finishTime)
+        .setQueryMasterPort(queryMasterPort)
+        .setQueryMasterClientPort(queryMasterClientPort)
+        .setQueryMasterInfoPort(queryMasterInfoPort);
+
+    if (sql != null) {
+      builder.setSql(sql);
+    }
+
+    if (lastMessage != null) {
+      builder.setLastMessage(lastMessage);
+    }
+
+    if (hostNameOfQM != null) {
+      builder.setHostNameOfQM(hostNameOfQM);
+    }
+
+    return builder.build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 7c3d799..b3b4dbb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -44,6 +44,7 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.history.QueryHistory;
 import org.apache.tajo.worker.TajoWorker;
 
 import java.util.ArrayList;
@@ -393,58 +394,64 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
 
     public void stopQuery(QueryId queryId) {
-      QueryMasterTask queryMasterTask;
-      queryMasterTask = queryMasterTasks.remove(queryId);
-      if(queryMasterTask == null) return;
+      QueryMasterTask queryMasterTask = queryMasterTasks.remove(queryId);
+      if(queryMasterTask == null) {
+        LOG.warn("No query info:" + queryId);
+        return;
+      }
 
       finishedQueryMasterTasks.put(queryId, queryMasterTask);
 
-      if(queryMasterTask != null) {
-        TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
-        CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>();
+      TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
+      CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>();
 
-        NettyClientBase tmClient = null;
-        try {
-          // In TajoMaster HA mode, if backup master be active status,
-          // worker may fail to connect existing active master. Thus,
-          // if worker can't connect the master, worker should try to connect another master and
-          // update master address in worker context.
-          if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-            try {
-              tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                  TajoMasterProtocol.class, true);
-            } catch (Exception e) {
-              queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
-              queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-              tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                  TajoMasterProtocol.class, true);
-            }
-          } else {
+      NettyClientBase tmClient = null;
+      try {
+        // In TajoMaster HA mode, if backup master be active status,
+        // worker may fail to connect existing active master. Thus,
+        // if worker can't connect the master, worker should try to connect another master and
+        // update master address in worker context.
+        if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+          try {
+            tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+                TajoMasterProtocol.class, true);
+          } catch (Exception e) {
+            queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
+            queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
             tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
                 TajoMasterProtocol.class, true);
           }
+        } else {
+          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
+        }
 
-          TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-          masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
-        }  catch (Exception e) {
-          //this function will be closed in new thread.
-          //When tajo do stop cluster, tajo master maybe throw closed connection exception
+        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+        masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
+      }  catch (Exception e) {
+        //this function will be closed in new thread.
+        //When tajo do stop cluster, tajo master maybe throw closed connection exception
 
-          LOG.error(e.getMessage(), e);
-        } finally {
-          connPool.releaseConnection(tmClient);
-        }
+        LOG.error(e.getMessage(), e);
+      } finally {
+        connPool.releaseConnection(tmClient);
+      }
 
-        try {
-          queryMasterTask.stop();
-          if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) {
-            cleanup(queryId);
-          }
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
+      try {
+        queryMasterTask.stop();
+        if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) {
+          cleanup(queryId);
+        }
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+      Query query = queryMasterTask.getQuery();
+      if (query != null) {
+        QueryHistory queryHisory = query.getQueryHistory();
+        if (queryHisory != null) {
+          query.context.getQueryMasterContext().getWorkerContext().
+              getTaskHistoryWriter().appendHistory(queryHisory);
         }
-      } else {
-        LOG.warn("No query info:" + queryId);
       }
       if(workerContext.isYarnContainerMode()) {
         stop();

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index fe2752f..0f275e9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -32,6 +32,7 @@ import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
 import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
@@ -42,8 +43,10 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.util.history.QueryUnitHistory;
 import org.apache.tajo.worker.FetchImpl;
 
 import java.net.URI;
@@ -84,6 +87,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 
   private QueryUnitAttemptId successfulAttempt;
   private String succeededHost;
+  private int succeededHostPort;
   private int succeededPullServerPort;
 
   private int failedAttempts;
@@ -96,6 +100,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 
   private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
 
+  private QueryUnitHistory finalQueryUnitHistory;
+
   protected static final StateMachineFactory
       <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
       new StateMachineFactory <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
@@ -214,6 +220,76 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     }
   }
 
+  public TaskAttemptState getLastAttemptStatus() {
+    QueryUnitAttempt lastAttempt = getLastAttempt();
+    if (lastAttempt != null) {
+      return lastAttempt.getState();
+    } else {
+      return TaskAttemptState.TA_ASSIGNED;
+    }
+  }
+
+  public QueryUnitHistory getQueryUnitHistory() {
+    if (finalQueryUnitHistory != null) {
+      if (finalQueryUnitHistory.getFinishTime() == 0) {
+        finalQueryUnitHistory = makeQueryUnitHistory();
+      }
+      return finalQueryUnitHistory;
+    } else {
+      return makeQueryUnitHistory();
+    }
+  }
+
+  private QueryUnitHistory makeQueryUnitHistory() {
+    QueryUnitHistory queryUnitHistory = new QueryUnitHistory();
+
+    QueryUnitAttempt lastAttempt = getLastAttempt();
+    if (lastAttempt != null) {
+      queryUnitHistory.setId(lastAttempt.getId().toString());
+      queryUnitHistory.setState(lastAttempt.getState().toString());
+      queryUnitHistory.setProgress(lastAttempt.getProgress());
+    }
+    queryUnitHistory.setHostAndPort(succeededHost + ":" + succeededHostPort);
+    queryUnitHistory.setRetryCount(this.getRetryCount());
+    queryUnitHistory.setLaunchTime(launchTime);
+    queryUnitHistory.setFinishTime(finishTime);
+
+    queryUnitHistory.setNumShuffles(getShuffleOutpuNum());
+    if (!getShuffleFileOutputs().isEmpty()) {
+      ShuffleFileOutput shuffleFileOutputs = getShuffleFileOutputs().get(0);
+      if (queryUnitHistory.getNumShuffles() > 0) {
+        queryUnitHistory.setShuffleKey("" + shuffleFileOutputs.getPartId());
+        queryUnitHistory.setShuffleFileName(shuffleFileOutputs.getFileName());
+      }
+    }
+
+    List<String> fragmentList = new ArrayList<String>();
+    for (FragmentProto eachFragment : getAllFragments()) {
+      FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment);
+      fragmentList.add(fileFragment.toString());
+    }
+    queryUnitHistory.setFragments(fragmentList.toArray(new String[]{}));
+
+    List<String[]> fetchList = new ArrayList<String[]>();
+    for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) {
+      for (FetchImpl f : e.getValue()) {
+        for (URI uri : f.getSimpleURIs()){
+          fetchList.add(new String[] {e.getKey(), uri.toString()});
+        }
+      }
+    }
+
+    queryUnitHistory.setFetchs(fetchList.toArray(new String[][]{}));
+
+    List<String> dataLocationList = new ArrayList<String>();
+    for(DataLocation eachLocation: getDataLocations()) {
+      dataLocationList.add(eachLocation.toString());
+    }
+
+    queryUnitHistory.setDataLocations(dataLocationList.toArray(new String[]{}));
+    return queryUnitHistory;
+  }
+
 	public void setLogicalPlan(LogicalNode plan) {
 	  this.plan = plan;
 
@@ -488,6 +564,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 
   private void finishTask() {
     this.finishTime = System.currentTimeMillis();
+    finalQueryUnitHistory = makeQueryUnitHistory();
   }
 
   private static class KillNewTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
@@ -527,6 +604,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 
       task.successfulAttempt = attemptEvent.getTaskAttemptId();
       task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
+      task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort();
       task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort();
 
       task.finishTask();
@@ -542,6 +620,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
       task.launchTime = System.currentTimeMillis();
       task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
+      task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 91fd22d..96534df 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -64,6 +64,8 @@ import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.history.QueryUnitHistory;
+import org.apache.tajo.util.history.SubQueryHistory;
 import org.apache.tajo.worker.FetchImpl;
 
 import java.io.IOException;
@@ -280,6 +282,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private TaskSchedulerContext schedulerContext;
   private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>();
   private AtomicInteger completeReportReceived = new AtomicInteger(0);
+  private SubQueryHistory finalSubQueryHistory;
 
   public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan,
                   ExecutionBlock block, StorageManager sm) {
@@ -394,6 +397,76 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     tasks.put(task.getId(), task);
   }
 
+  public SubQueryHistory getSubQueryHistory() {
+    if (finalSubQueryHistory != null) {
+      if (finalSubQueryHistory.getFinishTime() == 0) {
+        finalSubQueryHistory = makeSubQueryHistory();
+        finalSubQueryHistory.setQueryUnits(makeQueryUnitHistories());
+      }
+      return finalSubQueryHistory;
+    } else {
+      return makeSubQueryHistory();
+    }
+  }
+
+  private List<QueryUnitHistory> makeQueryUnitHistories() {
+    List<QueryUnitHistory> queryUnitHistories = new ArrayList<QueryUnitHistory>();
+
+    for(QueryUnit eachQueryUnit: getQueryUnits()) {
+      queryUnitHistories.add(eachQueryUnit.getQueryUnitHistory());
+    }
+
+    return queryUnitHistories;
+  }
+
+  private SubQueryHistory makeSubQueryHistory() {
+    SubQueryHistory subQueryHistory = new SubQueryHistory();
+
+    subQueryHistory.setExecutionBlockId(getId().toString());
+    subQueryHistory.setPlan(PlannerUtil.buildExplainString(block.getPlan()));
+    subQueryHistory.setState(getState().toString());
+    subQueryHistory.setStartTime(startTime);
+    subQueryHistory.setFinishTime(finishTime);
+    subQueryHistory.setSucceededObjectCount(succeededObjectCount);
+    subQueryHistory.setKilledObjectCount(killedObjectCount);
+    subQueryHistory.setFailedObjectCount(failedObjectCount);
+    subQueryHistory.setTotalScheduledObjectsCount(totalScheduledObjectsCount);
+    subQueryHistory.setHostLocalAssigned(getTaskScheduler().getHostLocalAssigned());
+    subQueryHistory.setRackLocalAssigned(getTaskScheduler().getRackLocalAssigned());
+
+    long totalInputBytes = 0;
+    long totalReadBytes = 0;
+    long totalReadRows = 0;
+    long totalWriteBytes = 0;
+    long totalWriteRows = 0;
+    int numShuffles = 0;
+    for(QueryUnit eachQueryUnit: getQueryUnits()) {
+      numShuffles = eachQueryUnit.getShuffleOutpuNum();
+      if (eachQueryUnit.getLastAttempt() != null) {
+        TableStats inputStats = eachQueryUnit.getLastAttempt().getInputStats();
+        if (inputStats != null) {
+          totalInputBytes += inputStats.getNumBytes();
+          totalReadBytes += inputStats.getReadBytes();
+          totalReadRows += inputStats.getNumRows();
+        }
+        TableStats outputStats = eachQueryUnit.getLastAttempt().getResultStats();
+        if (outputStats != null) {
+          totalWriteBytes += outputStats.getNumBytes();
+          totalWriteRows += outputStats.getNumRows();
+        }
+      }
+    }
+
+    subQueryHistory.setTotalInputBytes(totalInputBytes);
+    subQueryHistory.setTotalReadBytes(totalReadBytes);
+    subQueryHistory.setTotalReadRows(totalReadRows);
+    subQueryHistory.setTotalWriteBytes(totalWriteBytes);
+    subQueryHistory.setTotalWriteRows(totalWriteRows);
+    subQueryHistory.setNumShuffles(numShuffles);
+    subQueryHistory.setProgress(getProgress());
+    return subQueryHistory;
+  }
+
   /**
    * It finalizes this subquery. It is only invoked when the subquery is succeeded.
    */
@@ -1172,6 +1245,9 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
       getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds);
     }
+
+    this.finalSubQueryHistory = makeSubQueryHistory();
+    this.finalSubQueryHistory.setQueryUnits(makeQueryUnitHistories());
   }
 
   public List<IntermediateEntry> getHashShuffleIntermediateEntries() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 9d0dcaa..6d3597d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -23,10 +23,14 @@ import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.ha.HAService;
 import org.apache.tajo.master.querymaster.QueryInProgress;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.util.history.QueryUnitHistory;
+import org.apache.tajo.util.history.SubQueryHistory;
 import org.apache.tajo.worker.TaskRunnerHistory;
 import org.apache.tajo.worker.TaskRunner;
 
@@ -38,7 +42,7 @@ import static org.apache.tajo.conf.TajoConf.ConfVars;
 public class JSPUtil {
   static DecimalFormat decimalF = new DecimalFormat("###.0");
 
-  public static void sortQueryUnit(QueryUnit[] queryUnits, String sortField, String sortOrder) {
+  public static void sortQueryUnitArray(QueryUnit[] queryUnits, String sortField, String sortOrder) {
     if(sortField == null || sortField.isEmpty()) {
       sortField = "id";
     }
@@ -46,6 +50,22 @@ public class JSPUtil {
     Arrays.sort(queryUnits, new QueryUnitComparator(sortField, "asc".equals(sortOrder)));
   }
 
+  public static void sortQueryUnit(List<QueryUnit> queryUnits, String sortField, String sortOrder) {
+    if(sortField == null || sortField.isEmpty()) {
+      sortField = "id";
+    }
+
+    Collections.sort(queryUnits, new QueryUnitComparator(sortField, "asc".equals(sortOrder)));
+  }
+
+  public static void sortQueryUnitHistory(List<QueryUnitHistory> queryUnits, String sortField, String sortOrder) {
+    if(sortField == null || sortField.isEmpty()) {
+      sortField = "id";
+    }
+
+    Collections.sort(queryUnits, new QueryUnitHistoryComparator(sortField, "asc".equals(sortOrder)));
+  }
+
   public static void sortTaskRunner(List<TaskRunner> taskRunners) {
     Collections.sort(taskRunners, new Comparator<TaskRunner>() {
       @Override
@@ -147,6 +167,43 @@ public class JSPUtil {
     return subQueryList;
   }
 
+  public static List<SubQueryHistory> sortSubQueryHistory(Collection<SubQueryHistory> subQueries) {
+    List<SubQueryHistory> subQueryList = new ArrayList<SubQueryHistory>(subQueries);
+    Collections.sort(subQueryList, new Comparator<SubQueryHistory>() {
+      @Override
+      public int compare(SubQueryHistory subQuery1, SubQueryHistory subQuery2) {
+        long q1StartTime = subQuery1.getStartTime();
+        long q2StartTime = subQuery2.getStartTime();
+
+        q1StartTime = (q1StartTime == 0 ? Long.MAX_VALUE : q1StartTime);
+        q2StartTime = (q2StartTime == 0 ? Long.MAX_VALUE : q2StartTime);
+
+        int result = compareLong(q1StartTime, q2StartTime);
+        if (result == 0) {
+          return subQuery1.getExecutionBlockId().compareTo(subQuery2.getExecutionBlockId());
+        } else {
+          return result;
+        }
+      }
+    });
+
+    return subQueryList;
+  }
+
+  public static String getMasterActiveLabel(MasterContext context) {
+    HAService haService = context.getHAService();
+    String activeLabel = "";
+    if (haService != null) {
+      if (haService.isActiveStatus()) {
+        activeLabel = "<font color='#1e90ff'>(active)</font>";
+      } else {
+        activeLabel = "<font color='#1e90ff'>(backup)</font>";
+      }
+    }
+
+    return activeLabel;
+  }
+
   static class QueryUnitComparator implements Comparator<QueryUnit> {
     private String sortField;
     private boolean asc;
@@ -194,6 +251,53 @@ public class JSPUtil {
     }
   }
 
+  static class QueryUnitHistoryComparator implements Comparator<QueryUnitHistory> {
+    private String sortField;
+    private boolean asc;
+    public QueryUnitHistoryComparator(String sortField, boolean asc) {
+      this.sortField = sortField;
+      this.asc = asc;
+    }
+
+    @Override
+    public int compare(QueryUnitHistory queryUnit, QueryUnitHistory queryUnit2) {
+      if(asc) {
+        if("id".equals(sortField)) {
+          return queryUnit.getId().compareTo(queryUnit2.getId());
+        } else if("host".equals(sortField)) {
+          String host1 = queryUnit.getHostAndPort() == null ? "-" : queryUnit.getHostAndPort();
+          String host2 = queryUnit2.getHostAndPort() == null ? "-" : queryUnit2.getHostAndPort();
+          return host1.compareTo(host2);
+        } else if("runTime".equals(sortField)) {
+          return compareLong(queryUnit.getRunningTime(), queryUnit2.getRunningTime());
+        } else if("startTime".equals(sortField)) {
+          return compareLong(queryUnit.getLaunchTime(), queryUnit2.getLaunchTime());
+        } else {
+          return queryUnit.getId().compareTo(queryUnit2.getId());
+        }
+      } else {
+        if("id".equals(sortField)) {
+          return queryUnit2.getId().compareTo(queryUnit.getId());
+        } else if("host".equals(sortField)) {
+          String host1 = queryUnit.getHostAndPort() == null ? "-" : queryUnit.getHostAndPort();
+          String host2 = queryUnit2.getHostAndPort() == null ? "-" : queryUnit2.getHostAndPort();
+          return host2.compareTo(host1);
+        } else if("runTime".equals(sortField)) {
+          if(queryUnit2.getLaunchTime() == 0) {
+            return -1;
+          } else if(queryUnit.getLaunchTime() == 0) {
+            return 1;
+          }
+          return compareLong(queryUnit2.getRunningTime(), queryUnit.getRunningTime());
+        } else if("startTime".equals(sortField)) {
+          return compareLong(queryUnit2.getLaunchTime(), queryUnit.getLaunchTime());
+        } else {
+          return queryUnit2.getId().compareTo(queryUnit.getId());
+        }
+      }
+    }
+  }
+
   static int compareLong(long a, long b) {
     if(a > b) {
       return 1;
@@ -246,4 +350,60 @@ public class JSPUtil {
 
     return result;
   }
+
+  public static String getPageNavigation(int currentPage, int totalPage, String url) {
+    StringBuilder sb = new StringBuilder();
+
+    int pageIndex = (currentPage - 1) / 10;
+    int totalPageIndex = (totalPage - 1) / 10;
+
+    String prefix = "";
+
+    if (pageIndex > 0) {
+      int prevPage = pageIndex * 10;
+      sb.append(prefix).append("<a href='").append(url)
+          .append("&page=").append(prevPage).append("'>")
+          .append("&lt;</a>");
+      prefix = "&nbsp;&nbsp;";
+    }
+
+    for (int i = 1; i <= 10; i++) {
+      int printPage = pageIndex * 10 + i;
+      if (printPage == currentPage) {
+        sb.append(prefix).append(printPage);
+      } else {
+        sb.append(prefix).append("<a href='").append(url)
+            .append("&page=").append(printPage).append("'>")
+            .append("[").append(printPage).append("]</a>");
+      }
+      prefix = "&nbsp;&nbsp;";
+      if (printPage >= totalPage) {
+        break;
+      }
+    }
+
+    if(totalPageIndex > pageIndex) {
+      int nextPage = (pageIndex + 1) * 10 + 1;
+      sb.append(prefix).append("<a href='").append(url)
+          .append("&page=").append(nextPage).append("'>")
+          .append("&gt;</a>");
+    }
+    return sb.toString();
+  }
+
+  public static <T extends Object> List<T> getPageNavigationList(List<T> originList, int page, int pageSize) {
+    if (originList == null) {
+      return new ArrayList<T>();
+    }
+    int start = (page - 1) * pageSize;
+    int end = start + pageSize;
+    if (end > originList.size()) {
+      end = originList.size();
+    }
+    if (!originList.isEmpty()) {
+      return originList.subList(start, end);
+    }
+
+    return originList;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/util/history/History.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/History.java b/tajo-core/src/main/java/org/apache/tajo/util/history/History.java
new file mode 100644
index 0000000..0312d4c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/History.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.history;
+
+public interface History {
+  public static enum HistoryType {
+    TASK, QUERY, QUERY_SUMMARY
+  }
+
+  public HistoryType getHistoryType();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java
new file mode 100644
index 0000000..868dfcd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.history;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HistoryCleaner extends Thread {
+  private static final Log LOG = LogFactory.getLog(HistoryCleaner.class);
+
+  private int historyExpireDays;
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private Path historyParentPath;
+  private Path taskHistoryParentPath;
+  private TajoConf tajoConf;
+  private boolean isMaster;
+
+  public HistoryCleaner(TajoConf tajoConf, boolean isMaster) throws IOException {
+    super(HistoryCleaner.class.getName());
+
+    this.tajoConf = tajoConf;
+    this.isMaster = isMaster;
+    historyExpireDays = tajoConf.getIntVar(ConfVars.HISTORY_EXPIRY_TIME_DAY);
+    historyParentPath = tajoConf.getQueryHistoryDir(tajoConf);
+    taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf);
+  }
+
+  public void doStop() {
+    stopped.set(true);
+    this.interrupt();
+  }
+
+  @Override
+  public void run() {
+    LOG.info("History cleaner started: expiry day=" + historyExpireDays);
+    SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
+
+    while (!stopped.get()) {
+      try {
+        Thread.sleep(60 * 60 * 12 * 1000);   // 12 hours
+      } catch (InterruptedException e) {
+      }
+
+      if (stopped.get()) {
+        break;
+      }
+
+      try {
+        Calendar cal = Calendar.getInstance();
+        cal.add(Calendar.DAY_OF_MONTH, -historyExpireDays);
+
+        long cleanTargetTime = cal.getTime().getTime();
+
+        // Clean query history directory
+        if (isMaster) {
+          FileSystem fs = historyParentPath.getFileSystem(tajoConf);
+          if (fs.exists(historyParentPath)) {
+            FileStatus[] files = fs.listStatus(historyParentPath);
+            if (files != null) {
+              for (FileStatus eachFile : files) {
+                String pathName = eachFile.getPath().getName();
+                long pathTime;
+                try {
+                  pathTime = df.parse(pathName).getTime();
+                } catch (ParseException e) {
+                  LOG.warn(eachFile.getPath() + " is not History directory format.");
+                  continue;
+                }
+
+                if (pathTime < cleanTargetTime) {
+                  LOG.info("Cleaning query history dir: " + eachFile.getPath());
+                  fs.delete(eachFile.getPath(), true);
+                }
+              }
+            }
+          }
+        }
+
+        if (!isMaster) {
+          // Clean task history directory
+          FileSystem fs = taskHistoryParentPath.getFileSystem(tajoConf);
+          if (fs.exists(taskHistoryParentPath)) {
+            FileStatus[] files = fs.listStatus(taskHistoryParentPath);
+            if (files != null) {
+              for (FileStatus eachFile : files) {
+                String pathName = eachFile.getPath().getName();
+                long pathTime;
+                try {
+                  pathTime = df.parse(pathName).getTime();
+                } catch (ParseException e) {
+                  LOG.warn(eachFile.getPath() + " is not History directory format.");
+                  continue;
+                }
+
+                if (pathTime < cleanTargetTime) {
+                  LOG.info("Cleaning task history dir: " + eachFile.getPath());
+                  fs.delete(eachFile.getPath(), true);
+                }
+              }
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+    LOG.info("History cleaner stopped");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
new file mode 100644
index 0000000..21bc725
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.history;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
+import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.worker.TaskHistory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class HistoryReader {
+  private static final Log LOG = LogFactory.getLog(HistoryReader.class);
+
+  public static final int DEFAULT_PAGE_SIZE = 100;
+  public static final int DEFAULT_TASK_PAGE_SIZE = 2000;
+  private String processName;
+  private TajoConf tajoConf;
+  private Path historyParentPath;
+  private Path taskHistoryParentPath;
+
+  public HistoryReader(String processName, TajoConf tajoConf) throws IOException {
+    this.processName = processName.replaceAll(":", "_").toLowerCase();
+    this.tajoConf = tajoConf;
+
+    historyParentPath = tajoConf.getQueryHistoryDir(tajoConf);
+    taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf);
+  }
+
+  public List<QueryInfo> getQueries(String keyword) throws IOException {
+    List<QueryInfo> queryInfos = new ArrayList<QueryInfo>();
+
+    FileSystem fs = HistoryWriter.getNonCrcFileSystem(historyParentPath, tajoConf);
+    if (!fs.exists(historyParentPath)) {
+      return queryInfos;
+    }
+    FileStatus[] files = fs.listStatus(historyParentPath);
+    if (files == null || files.length == 0) {
+      return queryInfos;
+    }
+
+    for (FileStatus eachDateFile: files) {
+      if (eachDateFile.isFile()) {
+        continue;
+      }
+      FileStatus[] dateFiles = fs.listStatus(new Path(eachDateFile.getPath(), HistoryWriter.QUERY_LIST));
+      if (dateFiles == null || dateFiles.length == 0) {
+        continue;
+      }
+
+      for (FileStatus eachFile: dateFiles) {
+        if (eachFile.isDirectory()) {
+          continue;
+        }
+
+        Path path = eachFile.getPath();
+        if (!path.getName().endsWith(HistoryWriter.HISTORY_FILE_POSTFIX)) {
+          continue;
+        }
+
+        FSDataInputStream in = null;
+        try {
+          in = fs.open(path);
+
+          byte[] buf = new byte[100 * 1024];
+          while (true) {
+            int length = in.readInt();
+            if (length > buf.length) {
+              buf = new byte[length];
+            }
+            in.readFully(buf, 0, length);
+            String queryInfoJson = new String(buf, 0, length);
+            QueryInfo queryInfo = QueryInfo.fromJson(queryInfoJson);
+            if (keyword != null) {
+              if (queryInfo.getSql().indexOf(keyword) >= 0) {
+                queryInfos.add(queryInfo);
+              }
+            } else {
+              queryInfos.add(queryInfo);
+            }
+          }
+        } catch (EOFException e) {
+        } catch (Exception e) {
+          LOG.error("Reading error:" + path + ", " +e.getMessage(), e);
+        } finally {
+          in.close();
+        }
+      }
+    }
+
+    Collections.sort(queryInfos, new Comparator<QueryInfo>() {
+      @Override
+      public int compare(QueryInfo query1, QueryInfo query2) {
+        return query2.getQueryIdStr().toString().compareTo(query1.getQueryIdStr().toString());
+      }
+    });
+
+    return queryInfos;
+  }
+
+  private Path getQueryHistoryFilePath(String queryId, long startTime) throws IOException {
+    if (startTime == 0) {
+      String[] tokens = queryId.split("_");
+      if (tokens.length == 3) {
+        startTime = Long.parseLong(tokens[1]);
+      } else {
+        startTime = System.currentTimeMillis();
+      }
+    }
+    Path queryHistoryPath = HistoryWriter.getQueryHistoryFilePath(historyParentPath, queryId, startTime);
+    FileSystem fs = HistoryWriter.getNonCrcFileSystem(queryHistoryPath, tajoConf);
+
+    if (!fs.exists(queryHistoryPath)) {
+      LOG.info("No query history file: " + queryHistoryPath);
+      Calendar cal = Calendar.getInstance();
+      cal.setTimeInMillis(startTime);
+      cal.add(Calendar.DAY_OF_MONTH, -1);
+      queryHistoryPath = HistoryWriter.getQueryHistoryFilePath(historyParentPath, queryId, startTime);
+      if (!fs.exists(queryHistoryPath)) {
+        LOG.info("No query history file: " + queryHistoryPath);
+        cal.setTimeInMillis(startTime);
+        cal.add(Calendar.DAY_OF_MONTH, 1);
+        queryHistoryPath = HistoryWriter.getQueryHistoryFilePath(historyParentPath, queryId, startTime);
+      }
+      if (!fs.exists(queryHistoryPath)) {
+        LOG.info("No query history file: " + queryHistoryPath);
+        return null;
+      }
+    }
+    return queryHistoryPath;
+  }
+
+  public QueryHistory getQueryHistory(String queryId) throws IOException {
+    return  getQueryHistory(queryId, 0);
+  }
+
+  public QueryHistory getQueryHistory(String queryId, long startTime) throws IOException {
+    Path queryHistoryFile = getQueryHistoryFilePath(queryId, startTime);
+    if (queryHistoryFile == null) {
+      return null;
+    }
+    FileSystem fs = HistoryWriter.getNonCrcFileSystem(queryHistoryFile, tajoConf);
+
+    FileStatus fileStatus = fs.getFileStatus(queryHistoryFile);
+    if (fileStatus.getLen() > 10 * 1024 * 1024) {
+      throw new IOException("QueryHistory file is too big: " +
+          queryHistoryFile + ", " + fileStatus.getLen() + " bytes");
+    }
+    FSDataInputStream in = null;
+    try {
+      in = fs.open(queryHistoryFile);
+      byte[] buf = new byte[(int)fileStatus.getLen()];
+
+      in.readFully(buf, 0, buf.length);
+
+      return QueryHistory.fromJson(new String(buf));
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+    }
+  }
+
+  public List<QueryUnitHistory> getQueryUnitHistory(String queryId, String ebId) throws IOException {
+    Path queryHistoryFile = getQueryHistoryFilePath(queryId, 0);
+    if (queryHistoryFile == null) {
+      return new ArrayList<QueryUnitHistory>();
+    }
+    Path detailFile = new Path(queryHistoryFile.getParent(), ebId + HistoryWriter.HISTORY_FILE_POSTFIX);
+    FileSystem fs = HistoryWriter.getNonCrcFileSystem(detailFile, tajoConf);
+
+    if (!fs.exists(detailFile)) {
+      return new ArrayList<QueryUnitHistory>();
+    }
+
+    FileStatus fileStatus = fs.getFileStatus(detailFile);
+    if (fileStatus.getLen() > 100 * 1024 * 1024) {    // 100MB
+      throw new IOException("QueryUnitHistory file is too big: " +
+          detailFile + ", " + fileStatus.getLen() + " bytes");
+    }
+
+    FSDataInputStream in = null;
+    try {
+      in = fs.open(detailFile);
+      byte[] buf = new byte[(int)fileStatus.getLen()];
+
+      in.readFully(buf, 0, buf.length);
+
+      return SubQueryHistory.fromJsonQueryUnits(new String(buf));
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+    }
+  }
+
+  public TaskHistory getTaskHistory(String queryUnitAttemptId, long startTime) throws IOException {
+    FileSystem fs = HistoryWriter.getNonCrcFileSystem(taskHistoryParentPath, tajoConf);
+
+    SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
+
+    Calendar cal = Calendar.getInstance();
+    cal.setTime(new Date(startTime));
+
+    //current, current-1, current+1 hour
+    String[] targetHistoryFileDates = new String[3];
+    targetHistoryFileDates[0] = df.format(cal.getTime());
+
+    cal.add(Calendar.HOUR_OF_DAY, -1);
+    targetHistoryFileDates[1] = df.format(cal.getTime());
+
+    cal.setTime(new Date(startTime));
+    cal.add(Calendar.HOUR_OF_DAY, 1);
+    targetHistoryFileDates[2] = df.format(cal.getTime());
+
+    for (String historyFileDate : targetHistoryFileDates) {
+      Path fileParent = new Path(taskHistoryParentPath, historyFileDate.substring(0, 8) + "/tasks/" + processName);
+      String hour = historyFileDate.substring(8, 10);
+
+      if (!fs.exists(fileParent)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Task history parent not exists:" + fileParent);
+        }
+        continue;
+      }
+
+      FileStatus[] files = fs.listStatus(fileParent);
+      if (files == null || files.length == 0) {
+        return null;
+      }
+
+      String filePrefix = processName + "_" + hour + "_";
+
+      for (FileStatus eachFile : files) {
+        if (eachFile.getPath().getName().indexOf(filePrefix) != 0) {
+          continue;
+        }
+
+        FSDataInputStream in = null;
+        TaskHistoryProto.Builder builder = TaskHistoryProto.newBuilder();
+        try {
+          FileStatus status = fs.getFileStatus(eachFile.getPath());
+          LOG.info("Finding TaskHistory from " + status.getLen() + "," + eachFile.getPath());
+
+          in = fs.open(eachFile.getPath());
+          while (true) {
+            int len = in.readInt();
+            byte[] buf = new byte[len];
+            in.readFully(buf, 0, len);
+
+            builder.clear();
+            TaskHistoryProto taskHistoryProto = builder.mergeFrom(buf).build();
+            QueryUnitAttemptId attemptId = new QueryUnitAttemptId(taskHistoryProto.getQueryUnitAttemptId());
+            if (attemptId.toString().equals(queryUnitAttemptId)) {
+              return new TaskHistory(taskHistoryProto);
+            }
+          }
+        } catch (EOFException e) {
+        } finally {
+          if (in != null) {
+            in.close();
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  public QueryInfo getQueryInfo(String queryId) throws IOException {
+    List<QueryInfo> queries = getQueries(null);
+
+    if (queries != null) {
+      for (QueryInfo queryInfo: queries) {
+        if (queryId.equals(queryInfo.getQueryId().toString())) {
+          return queryInfo;
+        }
+      }
+    }
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
new file mode 100644
index 0000000..63a143b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.history;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.worker.TaskHistory;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * History directory structure
+ *   tajo.query-history.path: hdfs
+ *   tajo.task-history.path: local or hdfs
+ *
+ *   <tajo.history.query.dir>/<yyyyMMdd>/query-list/query-list-<HHmmss>.hist (TajoMaster's query list, hourly rolling)
+ *                                      /query-detail/<QUERY_ID>/query.hist    (QueryMaster's query detail)
+ *                                                               /<EB_ID>.hist  (QueryMaster's subquery detail)
+ *   <tajo.history.task.dir>/<yyyyMMdd>/tasks/<WORKER_HOST>_<WORKER_PORT>/<WORKER_HOST>_<WORKER_PORT>_<HH>_<seq>.hist
+ * History files are kept for "tajo.history.expiry-time-day" (default value is 7 days)
+ */
+public class HistoryWriter extends AbstractService {
+  private static final Log LOG = LogFactory.getLog(HistoryWriter.class);
+  public static final String QUERY_LIST = "query-list";
+  public static final String QUERY_DETAIL = "query-detail";
+  public static final String HISTORY_FILE_POSTFIX = ".hist";
+
+  private final LinkedBlockingQueue<History> historyQueue = new LinkedBlockingQueue<History>();
+  // key: yyyyMMddHH
+  private Map<String, WriterHolder> taskWriters = new HashMap<String, WriterHolder>();
+
+  // For TajoMaster's query list
+  private WriterHolder querySummaryWriter = null;
+
+  private WriterThread writerThread;
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private Path historyParentPath;
+  private Path taskHistoryParentPath;
+  private String processName;
+  private TajoConf tajoConf;
+  private HistoryCleaner historyCleaner;
+  private boolean isMaster;
+
+  public HistoryWriter(String processName, boolean isMaster) {
+    super(HistoryWriter.class.getName() + ":" + processName);
+    this.processName = processName.replaceAll(":", "_").toLowerCase();
+    this.isMaster = isMaster;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    tajoConf = (TajoConf)conf;
+    historyParentPath = tajoConf.getQueryHistoryDir(tajoConf);
+    taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf);
+    writerThread = new WriterThread();
+    historyCleaner = new HistoryCleaner(tajoConf, isMaster);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    for (WriterHolder eachWriter : taskWriters.values()) {
+      if (eachWriter.out != null) {
+        try {
+          eachWriter.out.close();
+        } catch (Exception err) {
+          LOG.error(err.getMessage(), err);
+        }
+      }
+    }
+    taskWriters.clear();
+    stopped.set(true);
+    writerThread.interrupt();
+
+    if (querySummaryWriter != null && querySummaryWriter.out != null) {
+      try {
+        querySummaryWriter.out.close();
+      } catch (Exception err) {
+        LOG.error(err.getMessage(), err);
+      }
+    }
+
+    if (historyCleaner != null) {
+      historyCleaner.doStop();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    writerThread.start();
+    historyCleaner.start();
+  }
+
+  public void appendHistory(History history) {
+    synchronized (historyQueue) {
+      historyQueue.add(history);
+      historyQueue.notifyAll();
+    }
+  }
+
+  public static FileSystem getNonCrcFileSystem(Path path, Configuration conf) throws IOException {
+    // https://issues.apache.org/jira/browse/HADOOP-7844
+    // If FileSystem is a local and CheckSumFileSystem, flushing doesn't touch file length.
+    // So HistoryReader can't read until closing the file.
+    FileSystem fs = path.getFileSystem(conf);
+    if (path.toUri().getScheme().equals("file")) {
+      fs.setWriteChecksum(false);
+    }
+
+    return fs;
+  }
+
+  public static Path getQueryHistoryFilePath(Path historyParentPath, String queryId, long startTime) {
+    SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
+
+    Path datePath = new Path(historyParentPath, df.format(startTime) + "/" + QUERY_DETAIL);
+    return new Path(datePath, queryId + "/query" + HISTORY_FILE_POSTFIX);
+  }
+
+  public static Path getQueryHistoryFilePath(Path historyParentPath, String queryId) {
+    SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
+
+    Path datePath = null;
+    try {
+      String[] tokens = queryId.split("_");
+      //q_1412483083972_0005 = q_<timestamp>_<seq>
+      if (tokens.length == 3) {
+        datePath = new Path(historyParentPath, df.format(tokens[1]) + "/" + QUERY_DETAIL);
+      } else {
+        datePath = new Path(historyParentPath, df.format(new Date(System.currentTimeMillis())) + "/" + QUERY_DETAIL);
+      }
+    } catch (Exception e) {
+      datePath = new Path(historyParentPath, df.format(new Date(System.currentTimeMillis())) + "/" + QUERY_DETAIL);
+    }
+    return new Path(datePath, queryId + "/query" + HISTORY_FILE_POSTFIX);
+  }
+
+  class WriterThread extends Thread {
+    public void run() {
+      LOG.info("HistoryWriter_"+ processName + " started.");
+      SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
+      while (!stopped.get()) {
+        List<History> histories = new ArrayList<History>();
+        synchronized (historyQueue) {
+          historyQueue.drainTo(histories);
+          if (histories.isEmpty()) {
+            try {
+              historyQueue.wait(60 * 1000);
+            } catch (InterruptedException e) {
+              if (stopped.get()) {
+                break;
+              }
+            }
+          }
+        }
+        if (stopped.get()) {
+          break;
+        }
+        try {
+          if (!histories.isEmpty()) {
+            writeHistory(histories);
+          }
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        }
+
+        //clean up history file
+
+        // closing previous writer
+        synchronized (taskWriters) {
+          Calendar cal = Calendar.getInstance();
+          cal.add(Calendar.HOUR_OF_DAY, -2);
+          String closeTargetTime = df.format(cal.getTime());
+          List<String> closingTargets = new ArrayList<String>();
+          synchronized (taskWriters) {
+            for (String eachWriterTime : taskWriters.keySet()) {
+              if (eachWriterTime.compareTo(closeTargetTime) <= 0) {
+                closingTargets.add(eachWriterTime);
+              }
+            }
+          }
+
+          for (String eachWriterTime : closingTargets) {
+            WriterHolder writerHolder = null;
+            synchronized (taskWriters) {
+              writerHolder = taskWriters.remove(eachWriterTime);
+            }
+            if (writerHolder != null) {
+              LOG.info("Closing task history file: " + writerHolder.path);
+              if (writerHolder.out != null) {
+                try {
+                  writerHolder.out.close();
+                } catch (IOException e) {
+                  LOG.error(e.getMessage(), e);
+                }
+              }
+            }
+          }
+        }
+      }
+      LOG.info("HistoryWriter_"+ processName + " stopped.");
+    }
+
+    public void writeHistory(List<History> histories) {
+      if (histories.isEmpty()) {
+        return;
+      }
+      for (History eachHistory : histories) {
+        switch(eachHistory.getHistoryType()) {
+          case TASK:
+            try {
+              writeTaskHistory((TaskHistory) eachHistory);
+            } catch (Exception e) {
+              LOG.error("Error while saving task history: " +
+                  ((TaskHistory) eachHistory).getQueryUnitAttemptId() + ":" + e.getMessage(), e);
+            }
+            break;
+          case QUERY:
+            try {
+              writeQueryHistory((QueryHistory) eachHistory);
+            } catch (Exception e) {
+              LOG.error("Error while saving query history: " +
+                  ((QueryHistory) eachHistory).getQueryId() + ":" + e.getMessage(), e);
+            }
+            break;
+          case QUERY_SUMMARY:
+            try {
+              writeQuerySummary((QueryInfo) eachHistory);
+            } catch (Exception e) {
+              LOG.error("Error while saving query summary: " +
+                  ((QueryInfo) eachHistory).getQueryId() + ":" + e.getMessage(), e);
+            }
+            break;
+          default:
+            LOG.warn("Wrong history type: " + eachHistory.getHistoryType());
+        }
+      }
+    }
+
+    private synchronized void writeQueryHistory(QueryHistory queryHistory) throws Exception {
+      // QueryMaster's query detail history (json format)
+      // <tajo.query-history.path>/<yyyyMMdd>/query-detail/<QUERY_ID>/query.hist
+
+      // QueryMaster's subquery detail history(proto binary format)
+      // <tajo.query-history.path>/<yyyyMMdd>/query-detail/<QUERY_ID>/<EB_ID>.hist
+
+      Path queryHistoryFile = getQueryHistoryFilePath(historyParentPath, queryHistory.getQueryId());
+      FileSystem fs = getNonCrcFileSystem(queryHistoryFile, tajoConf);
+
+      if (!fs.exists(queryHistoryFile.getParent())) {
+        if (!fs.mkdirs(queryHistoryFile.getParent())) {
+          LOG.error("Can't make QueryHistory dir: " + queryHistoryFile.getParent());
+          return;
+        }
+      }
+
+      FSDataOutputStream out = null;
+      try {
+        LOG.info("Saving query summary: " + queryHistoryFile);
+        out = fs.create(queryHistoryFile);
+        out.write(queryHistory.toJson().getBytes());
+      } finally {
+        if (out != null) {
+          try {
+            out.close();
+          } catch (Exception err) {
+            LOG.error(err.getMessage(), err);
+          }
+        }
+      }
+
+      if (queryHistory.getSubQueryHistories() != null) {
+        for (SubQueryHistory subQueryHistory : queryHistory.getSubQueryHistories()) {
+          Path path = new Path(queryHistoryFile.getParent(), subQueryHistory.getExecutionBlockId() + HISTORY_FILE_POSTFIX);
+          out = null;
+          try {
+            out = fs.create(path);
+            out.write(subQueryHistory.toQueryUnitsJson().getBytes());
+            LOG.info("Saving query unit: " + path);
+          } finally {
+            if (out != null) {
+              try {
+                out.close();
+              } catch (Exception err) {
+                LOG.error(err.getMessage(), err);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    private synchronized void writeQuerySummary(QueryInfo queryInfo) throws Exception {
+        // writing to HDFS and rolling hourly
+      if (querySummaryWriter == null) {
+        querySummaryWriter = new WriterHolder();
+        rollingQuerySummaryWriter();
+      } else {
+        if (querySummaryWriter.out == null) {
+          rollingQuerySummaryWriter();
+        } else if (System.currentTimeMillis() - querySummaryWriter.lastWritingTime >= 60 * 60 * 1000) {
+          if (querySummaryWriter.out != null) {
+            LOG.info("Close query history file: " + querySummaryWriter.path);
+            querySummaryWriter.out.close();
+          }
+          rollingQuerySummaryWriter();
+        }
+      }
+      byte[] jsonBytes = ("\n" + queryInfo.toJson() + "\n").getBytes();
+
+      querySummaryWriter.out.writeInt(jsonBytes.length);
+      querySummaryWriter.out.write(jsonBytes);
+      querySummaryWriter.out.hflush();
+    }
+
+    private synchronized void rollingQuerySummaryWriter() throws Exception {
+      // finding largest file sequence
+      SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");
+      String currentDateTime = df.format(new Date(System.currentTimeMillis()));
+
+      Path datePath = new Path(historyParentPath, currentDateTime.substring(0, 8) + "/" + QUERY_LIST);
+      FileSystem fs = getNonCrcFileSystem(datePath, tajoConf);
+      if (!fs.exists(datePath)) {
+        if (!fs.mkdirs(datePath)) {
+          LOG.error("Can't make QueryList history dir: " + datePath.getParent());
+          return;
+        }
+      }
+
+      Path historyFile = new Path(datePath, QUERY_LIST + "-" + currentDateTime.substring(8, 14) + HISTORY_FILE_POSTFIX);
+      querySummaryWriter.path = historyFile;
+      querySummaryWriter.lastWritingTime = System.currentTimeMillis();
+      LOG.info("Create query history file: " + historyFile);
+      querySummaryWriter.out = fs.create(historyFile);
+    }
+
+    private synchronized void writeTaskHistory(TaskHistory taskHistory) throws Exception {
+      SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
+
+      String taskStartTime = df.format(new Date(taskHistory.getStartTime()));
+
+      // taskWriters variable has three Writer(currentTime-2, currentTime-1, currentTime)
+      // because Task history writer is rolled by hourly
+      WriterHolder writerHolder = taskWriters.get(taskStartTime);
+      if (writerHolder == null) {
+        writerHolder = new WriterHolder();
+        writerHolder.out = createTaskHistoryFile(taskStartTime, writerHolder);
+        taskWriters.put(taskStartTime, writerHolder);
+      }
+      writerHolder.lastWritingTime = System.currentTimeMillis();
+
+      if (writerHolder.out != null) {
+        byte[] taskHistoryBytes = taskHistory.getProto().toByteArray();
+        writerHolder.out.writeInt(taskHistoryBytes.length);
+        writerHolder.out.write(taskHistoryBytes);
+        writerHolder.out.flush();
+      }
+    }
+
+    private FSDataOutputStream createTaskHistoryFile(String taskStartTime, WriterHolder writerHolder) throws IOException {
+      FileSystem fs = getNonCrcFileSystem(taskHistoryParentPath, tajoConf);
+      Path path = getQueryTaskHistoryPath(fs, taskHistoryParentPath, processName, taskStartTime);
+      if (!fs.exists(path)) {
+        if (!fs.mkdirs(path.getParent())) {
+          LOG.error("Can't make Query history directory: " + path);
+          return null;
+        }
+      }
+      writerHolder.path = path;
+      return fs.create(path, false);
+    }
+  }
+
+  public static Path getQueryTaskHistoryPath(FileSystem fs, Path parent,
+                                              String processName, String taskStartTime) throws IOException {
+    // <tajo.task-history.path>/<yyyyMMdd>/tasks/<WORKER_HOST>_<WORKER_PORT>/<WORKER_HOST>_<WORKER_PORT>_<HH>_<seq>.hist
+
+    // finding largest sequence path
+    Path fileParent = new Path(parent, taskStartTime.substring(0, 8) + "/tasks/" + processName);
+
+    String hour = taskStartTime.substring(8, 10);
+    int maxSeq = -1;
+
+    if (!fs.exists(fileParent)) {
+      maxSeq++;
+      return new Path(fileParent, processName + "_" + hour + "_" + maxSeq + HISTORY_FILE_POSTFIX);
+    }
+
+    if (!fs.isDirectory(fileParent)) {
+      throw new IOException("Task history path is not directory: " + fileParent);
+    }
+    FileStatus[] files = fs.listStatus(fileParent);
+    if (files != null) {
+      for (FileStatus eachFile: files) {
+        String[] nameTokens = eachFile.getPath().getName().split("_");
+        if (nameTokens.length != 4) {
+          continue;
+        }
+
+        if (nameTokens[2].equals(hour)) {
+          int prefixIndex = nameTokens[3].indexOf(".");
+          if (prefixIndex > 0) {
+            try {
+              int fileSeq = Integer.parseInt(nameTokens[3].substring(0, prefixIndex));
+              if (fileSeq > maxSeq) {
+                maxSeq = fileSeq;
+              }
+            } catch (NumberFormatException e) {
+            }
+          }
+        }
+      }
+    }
+
+    maxSeq++;
+    return new Path(fileParent, processName + "_" + hour + "_" + maxSeq + HISTORY_FILE_POSTFIX);
+  }
+
+  class WriterHolder {
+    long lastWritingTime;
+    Path path;
+    FSDataOutputStream out;
+  }
+}


Mime
View raw message