tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [4/5] tajo git commit: TAJO-1289: History reader fails to get the query information after a successful query execution. (jinho)
Date Mon, 26 Jan 2015 04:17:18 GMT
TAJO-1289: History reader fails to get the query information after a successful query execution.
(jinho)

Closes #356


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a15b5fab
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a15b5fab
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a15b5fab

Branch: refs/heads/index_support
Commit: a15b5fab7b1475f5cb4e5eba842f1c4b17166b58
Parents: 17c6dff
Author: jhkim <jhkim@apache.org>
Authored: Fri Jan 23 15:40:07 2015 +0900
Committer: jhkim <jhkim@apache.org>
Committed: Fri Jan 23 15:40:07 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   2 +
 .../main/java/org/apache/tajo/util/Bytes.java   |   6 +-
 .../org/apache/tajo/master/QueryInProgress.java |   6 +-
 .../java/org/apache/tajo/master/QueryInfo.java  |   7 +-
 .../org/apache/tajo/master/QueryManager.java    |  25 +-
 .../apache/tajo/querymaster/QueryMaster.java    |   8 +-
 .../apache/tajo/util/history/HistoryReader.java |  33 +-
 .../apache/tajo/util/history/HistoryWriter.java | 361 ++++++++++++++-----
 .../apache/tajo/worker/TaskRunnerManager.java   |   1 +
 .../org/apache/tajo/client/TestTajoClient.java  |  16 +-
 .../util/history/TestHistoryWriterReader.java   |  19 +-
 12 files changed, 335 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4e14c93..6969bf1 100644
--- a/CHANGES
+++ b/CHANGES
@@ -171,6 +171,9 @@ Release 0.10.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1289: History reader fails to get the query information after 
+    a successful query execution. (jinho)
+
     TAJO-1303: CDH cannot pass hadoop version check test.
     (Keuntae Park via jihun)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 74a9271..195743a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -268,6 +268,8 @@ public class TajoConf extends Configuration {
     HISTORY_QUERY_DIR("tajo.history.query.dir", STAGING_ROOT_DIR.defaultVal + "/history"),
     HISTORY_TASK_DIR("tajo.history.task.dir", "file:///tmp/tajo-${user.name}/history"),
     HISTORY_EXPIRY_TIME_DAY("tajo.history.expiry-time-day", 7),
+    HISTORY_QUERY_REPLICATION("tajo.history.query.replication", 1, Validators.min("1")),
+    HISTORY_TASK_REPLICATION("tajo.history.task.replication", 1, Validators.min("1")),
 
     // Misc -------------------------------------------------------------------
     // Fragment

http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
index 405ec2f..8191cb6 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
@@ -55,14 +55,14 @@ import static com.google.common.base.Preconditions.*;
 public class Bytes {
   //HConstants.UTF8_ENCODING should be updated if this changed
   /** When we encode strings, we always specify UTF8 encoding */
-  private static final String UTF8_ENCODING = "UTF-8";
+  public static final String UTF8_ENCODING = "UTF-8";
 
   //HConstants.UTF8_CHARSET should be updated if this changed
   /** When we encode strings, we always specify UTF8 encoding */
-  private static final Charset UTF8_CHARSET = Charset.forName(UTF8_ENCODING);
+  public static final Charset UTF8_CHARSET = Charset.forName(UTF8_ENCODING);
 
   //HConstants.EMPTY_BYTE_ARRAY should be updated if this changed
-  private static final byte [] EMPTY_BYTE_ARRAY = new byte [0];
+  public static final byte [] EMPTY_BYTE_ARRAY = new byte [0];
 
   private static final Log LOG = LogFactory.getLog(Bytes.class);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index df461c8..7e2c05f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -103,7 +103,11 @@ public class QueryInProgress {
       RpcConnectionPool.getPool().closeConnection(queryMasterRpc);
     }
 
-    masterContext.getHistoryWriter().appendHistory(queryInfo);
+    try {
+      masterContext.getHistoryWriter().appendAndFlush(queryInfo);
+    } catch (Throwable e) {
+      LOG.warn(e);
+    }
   }
 
   public boolean startQueryMaster() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java
index f902081..b11fd99 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java
@@ -30,7 +30,7 @@ import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.util.history.History;
 
-public class QueryInfo implements GsonObject, History {
+public class QueryInfo implements GsonObject, History, Comparable<QueryInfo> {
   private QueryId queryId;
   @Expose
   private QueryContext context;
@@ -232,4 +232,9 @@ public class QueryInfo implements GsonObject, History {
 
     return builder.build();
   }
+
+  @Override
+  public int compareTo(QueryInfo o) {
+    return queryId.compareTo(o.queryId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index eebefa7..bc6f07b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -20,6 +20,8 @@ package org.apache.tajo.master;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.collections.map.LRUMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,10 +38,12 @@ import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
 import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.querymaster.QueryJobEvent;
 import org.apache.tajo.session.Session;
+import org.apache.tajo.util.history.HistoryReader;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -59,6 +63,7 @@ public class QueryManager extends CompositeService {
   private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
 
   private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
+  private final LRUMap historyCache = new LRUMap(HistoryReader.DEFAULT_PAGE_SIZE);
 
   private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
   private AtomicLong maxExecutionTime = new AtomicLong();
@@ -121,19 +126,27 @@ public class QueryManager extends CompositeService {
 
   public synchronized Collection<QueryInfo> getFinishedQueries() {
     try {
-      return this.masterContext.getHistoryReader().getQueries(null);
+      Set<QueryInfo> result = Sets.newTreeSet();
+      result.addAll(this.masterContext.getHistoryReader().getQueries(null));
+      synchronized (historyCache) {
+        result.addAll(historyCache.values());
+      }
+      return result;
     } catch (Throwable e) {
       LOG.error(e);
       return Lists.newArrayList();
     }
   }
 
-
   public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
     try {
-      return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
+      QueryInfo queryInfo = (QueryInfo) historyCache.get(queryId);
+      if (queryInfo == null) {
+        queryInfo = this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
+      }
+      return queryInfo;
     } catch (Throwable e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return null;
     }
   }
@@ -235,6 +248,10 @@ public class QueryManager extends CompositeService {
       }
 
       QueryInfo queryInfo = queryInProgress.getQueryInfo();
+      synchronized (historyCache) {
+        historyCache.put(queryInfo.getQueryId(), queryInfo);
+      }
+
       long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
       if (executionTime < minExecutionTime.get()) {
         minExecutionTime.set(executionTime);

http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index be78fc3..1496b62 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -395,8 +395,12 @@ public class QueryMaster extends CompositeService implements EventHandler
{
       if (query != null) {
         QueryHistory queryHisory = query.getQueryHistory();
         if (queryHisory != null) {
-          query.context.getQueryMasterContext().getWorkerContext().
-              getTaskHistoryWriter().appendHistory(queryHisory);
+          try {
+            query.context.getQueryMasterContext().getWorkerContext().
+                getTaskHistoryWriter().appendAndFlush(queryHisory);
+          } catch (Throwable e) {
+            LOG.warn(e);
+          }
         }
       }
       if(workerContext.isYarnContainerMode()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
index c3f0087..f4719b2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
@@ -24,10 +24,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
 import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.util.Bytes;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -56,30 +58,33 @@ public class HistoryReader {
     List<QueryInfo> queryInfos = new ArrayList<QueryInfo>();
 
     FileSystem fs = HistoryWriter.getNonCrcFileSystem(historyParentPath, tajoConf);
-    if (!fs.exists(historyParentPath)) {
+    try {
+      if (!fs.exists(historyParentPath)) {
+        return queryInfos;
+      }
+    } catch (Throwable e){
       return queryInfos;
     }
+
     FileStatus[] files = fs.listStatus(historyParentPath);
     if (files == null || files.length == 0) {
       return queryInfos;
     }
 
     for (FileStatus eachDateFile: files) {
-      if (eachDateFile.isFile()) {
+      Path queryListPath = new Path(eachDateFile.getPath(), HistoryWriter.QUERY_LIST);
+      if (eachDateFile.isFile() || !fs.exists(queryListPath)) {
         continue;
       }
-      FileStatus[] dateFiles = fs.listStatus(new Path(eachDateFile.getPath(), HistoryWriter.QUERY_LIST));
+
+      FileStatus[] dateFiles = fs.listStatus(queryListPath);
       if (dateFiles == null || dateFiles.length == 0) {
         continue;
       }
 
       for (FileStatus eachFile: dateFiles) {
-        if (eachFile.isDirectory()) {
-          continue;
-        }
-
         Path path = eachFile.getPath();
-        if (!path.getName().endsWith(HistoryWriter.HISTORY_FILE_POSTFIX)) {
+        if (eachFile.isDirectory() || !path.getName().endsWith(HistoryWriter.HISTORY_FILE_POSTFIX))
{
           continue;
         }
 
@@ -94,7 +99,7 @@ public class HistoryReader {
               buf = new byte[length];
             }
             in.readFully(buf, 0, length);
-            String queryInfoJson = new String(buf, 0, length);
+            String queryInfoJson = new String(buf, 0, length, Bytes.UTF8_CHARSET);
             QueryInfo queryInfo = QueryInfo.fromJson(queryInfoJson);
             if (keyword != null) {
               if (queryInfo.getSql().indexOf(keyword) >= 0) {
@@ -105,10 +110,10 @@ public class HistoryReader {
             }
           }
         } catch (EOFException e) {
-        } catch (Exception e) {
-          LOG.error("Reading error:" + path + ", " +e.getMessage(), e);
+        } catch (Throwable e) {
+          LOG.warn("Reading error:" + path + ", " +e.getMessage());
         } finally {
-          in.close();
+          IOUtils.cleanup(LOG, in);
         }
       }
     }
@@ -178,7 +183,7 @@ public class HistoryReader {
 
       in.readFully(buf, 0, buf.length);
 
-      return QueryHistory.fromJson(new String(buf));
+      return QueryHistory.fromJson(new String(buf, Bytes.UTF8_CHARSET));
     } finally {
       if (in != null) {
         in.close();
@@ -210,7 +215,7 @@ public class HistoryReader {
 
       in.readFully(buf, 0, buf.length);
 
-      return StageHistory.fromJsonTasks(new String(buf));
+      return StageHistory.fromJsonTasks(new String(buf, Bytes.UTF8_CHARSET));
     } finally {
       if (in != null) {
         in.close();

http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index 9eb58da..3fea3ef 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -18,19 +18,26 @@
 
 package org.apache.tajo.util.history;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.util.Bytes;
 import org.apache.tajo.worker.TaskHistory;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -50,7 +57,8 @@ public class HistoryWriter extends AbstractService {
   public static final String QUERY_DETAIL = "query-detail";
   public static final String HISTORY_FILE_POSTFIX = ".hist";
 
-  private final LinkedBlockingQueue<History> historyQueue = new LinkedBlockingQueue<History>();
+  private final LinkedBlockingQueue<WriterFuture<WriterHolder>>
+      historyQueue = new LinkedBlockingQueue<WriterFuture<WriterHolder>>();
   // key: yyyyMMddHH
   private Map<String, WriterHolder> taskWriters = new HashMap<String, WriterHolder>();
 
@@ -65,6 +73,8 @@ public class HistoryWriter extends AbstractService {
   private TajoConf tajoConf;
   private HistoryCleaner historyCleaner;
   private boolean isMaster;
+  private short queryReplication;
+  private short taskReplication;
 
   public HistoryWriter(String processName, boolean isMaster) {
     super(HistoryWriter.class.getName() + ":" + processName);
@@ -79,31 +89,25 @@ public class HistoryWriter extends AbstractService {
     taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf);
     writerThread = new WriterThread();
     historyCleaner = new HistoryCleaner(tajoConf, isMaster);
+    queryReplication = (short) tajoConf.getIntVar(TajoConf.ConfVars.HISTORY_QUERY_REPLICATION);
+    taskReplication = (short) tajoConf.getIntVar(TajoConf.ConfVars.HISTORY_TASK_REPLICATION);
     super.serviceInit(conf);
   }
 
   @Override
   public void serviceStop() throws Exception {
+    if(stopped.getAndSet(true)){
+      return;
+    }
+
     for (WriterHolder eachWriter : taskWriters.values()) {
-      if (eachWriter.out != null) {
-        try {
-          eachWriter.out.close();
-        } catch (Exception err) {
-          LOG.error(err.getMessage(), err);
-        }
-      }
+      IOUtils.cleanup(LOG, eachWriter);
     }
+
     taskWriters.clear();
-    stopped.set(true);
     writerThread.interrupt();
 
-    if (querySummaryWriter != null && querySummaryWriter.out != null) {
-      try {
-        querySummaryWriter.out.close();
-      } catch (Exception err) {
-        LOG.error(err.getMessage(), err);
-      }
-    }
+    IOUtils.cleanup(LOG, querySummaryWriter);
 
     if (historyCleaner != null) {
       historyCleaner.doStop();
@@ -117,10 +121,53 @@ public class HistoryWriter extends AbstractService {
     historyCleaner.start();
   }
 
-  public void appendHistory(History history) {
-    synchronized (historyQueue) {
-      historyQueue.add(history);
-      historyQueue.notifyAll();
+  /* asynchronously append to history file */
+  public WriterFuture<WriterHolder> appendHistory(History history) {
+    WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(history);
+    historyQueue.add(future);
+    return future;
+  }
+
+  /* asynchronously flush to history file */
+  public synchronized WriterFuture<WriterHolder> appendAndFlush(History history) {
+    WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(history)
{
+      public void done(WriterHolder holder) {
+        try {
+          if (holder != null) holder.flush();
+          super.done(holder);
+        } catch (IOException e) {
+          super.failed(e);
+        }
+      }
+    };
+    historyQueue.add(future);
+    synchronized (writerThread) {
+      writerThread.notifyAll();
+    }
+    return future;
+  }
+
+  /* synchronously flush to history file */
+  public synchronized void appendAndSync(History history)
+      throws TimeoutException, InterruptedException, IOException {
+
+    WriterFuture<WriterHolder> future = appendAndFlush(history);
+
+    future.get(5, TimeUnit.SECONDS);
+    if(!future.isSucceed()){
+      throw new IOException(future.getError());
+    }
+  }
+
+  /* Flushing the buffer */
+  public synchronized void flushTaskHistories() {
+    if (historyQueue.size() > 0) {
+      synchronized (writerThread) {
+        writerThread.needTaskFlush.set(true);
+        writerThread.notifyAll();
+      }
+    } else {
+      writerThread.flushTaskHistories();
     }
   }
 
@@ -146,7 +193,7 @@ public class HistoryWriter extends AbstractService {
   public static Path getQueryHistoryFilePath(Path historyParentPath, String queryId) {
     SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
 
-    Path datePath = null;
+    Path datePath;
     try {
       String[] tokens = queryId.split("_");
       //q_1412483083972_0005 = q_<timestamp>_<seq>
@@ -162,31 +209,27 @@ public class HistoryWriter extends AbstractService {
   }
 
   class WriterThread extends Thread {
+    private AtomicBoolean needTaskFlush = new AtomicBoolean(false);
+
     public void run() {
-      LOG.info("HistoryWriter_"+ processName + " started.");
+      LOG.info("HistoryWriter_" + processName + " started.");
       SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
       while (!stopped.get()) {
-        List<History> histories = new ArrayList<History>();
-        synchronized (historyQueue) {
-          historyQueue.drainTo(histories);
-          if (histories.isEmpty()) {
-            try {
-              historyQueue.wait(60 * 1000);
-            } catch (InterruptedException e) {
-              if (stopped.get()) {
-                break;
-              }
-            }
-          }
-        }
-        if (stopped.get()) {
-          break;
+        List<WriterFuture<WriterHolder>> histories = Lists.newArrayList();
+
+        try {
+          drainHistory(histories, 100, 1000);
+        } catch (InterruptedException e) {
+          if (stopped.get()) break;
         }
+
         try {
           if (!histories.isEmpty()) {
             writeHistory(histories);
+          } else {
+            continue;
           }
-        } catch (Exception e) {
+        } catch (Throwable e) {
           LOG.error(e.getMessage(), e);
         }
 
@@ -207,60 +250,91 @@ public class HistoryWriter extends AbstractService {
           }
 
           for (String eachWriterTime : closingTargets) {
-            WriterHolder writerHolder = null;
+            WriterHolder writerHolder;
             synchronized (taskWriters) {
               writerHolder = taskWriters.remove(eachWriterTime);
             }
+
             if (writerHolder != null) {
               LOG.info("Closing task history file: " + writerHolder.path);
-              if (writerHolder.out != null) {
-                try {
-                  writerHolder.out.close();
-                } catch (IOException e) {
-                  LOG.error(e.getMessage(), e);
-                }
+              IOUtils.cleanup(LOG, writerHolder);
+            }
+          }
+        }
+      }
+      LOG.info("HistoryWriter_" + processName + " stopped.");
+    }
+
+    private int drainHistory(Collection<WriterFuture<WriterHolder>> buffer, int
numElements,
+                             long timeoutMillis) throws InterruptedException {
+
+      long deadline = System.currentTimeMillis() + timeoutMillis;
+      int added = 0;
+      while (added < numElements) {
+        added += historyQueue.drainTo(buffer, numElements - added);
+        if (added < numElements) { // not enough elements immediately available; will
have to wait
+          if (deadline <= System.currentTimeMillis()) {
+            break;
+          } else {
+            synchronized (writerThread) {
+              writerThread.wait(deadline - System.currentTimeMillis());
+              if (deadline > System.currentTimeMillis()) {
+                added += historyQueue.drainTo(buffer, numElements - added);
+                break;
               }
             }
           }
         }
       }
-      LOG.info("HistoryWriter_"+ processName + " stopped.");
+      return added;
     }
 
-    public void writeHistory(List<History> histories) {
+    private List<WriterFuture<WriterHolder>> writeHistory(List<WriterFuture<WriterHolder>>
histories) {
+
       if (histories.isEmpty()) {
-        return;
+        return histories;
       }
-      for (History eachHistory : histories) {
-        switch(eachHistory.getHistoryType()) {
+
+      for (WriterFuture<WriterHolder> future : histories) {
+        History history = future.getHistory();
+        switch (history.getHistoryType()) {
           case TASK:
             try {
-              writeTaskHistory((TaskHistory) eachHistory);
-            } catch (Exception e) {
+              future.done(writeTaskHistory((TaskHistory) history));
+            } catch (Throwable e) {
               LOG.error("Error while saving task history: " +
-                  ((TaskHistory) eachHistory).getTaskAttemptId() + ":" + e.getMessage(),
e);
+                  ((TaskHistory) history).getTaskAttemptId() + ":" + e.getMessage(), e);
+              future.failed(e);
             }
             break;
           case QUERY:
             try {
-              writeQueryHistory((QueryHistory) eachHistory);
-            } catch (Exception e) {
+              writeQueryHistory((QueryHistory) history);
+              future.done(null);
+            } catch (Throwable e) {
               LOG.error("Error while saving query history: " +
-                  ((QueryHistory) eachHistory).getQueryId() + ":" + e.getMessage(), e);
+                  ((QueryHistory) history).getQueryId() + ":" + e.getMessage(), e);
+              future.failed(e);
             }
             break;
           case QUERY_SUMMARY:
             try {
-              writeQuerySummary((QueryInfo) eachHistory);
-            } catch (Exception e) {
+              future.done(writeQuerySummary((QueryInfo) history));
+            } catch (Throwable e) {
               LOG.error("Error while saving query summary: " +
-                  ((QueryInfo) eachHistory).getQueryId() + ":" + e.getMessage(), e);
+                  ((QueryInfo) history).getQueryId() + ":" + e.getMessage(), e);
+              future.failed(e);
             }
             break;
           default:
-            LOG.warn("Wrong history type: " + eachHistory.getHistoryType());
+            LOG.warn("Wrong history type: " + history.getHistoryType());
         }
       }
+
+      if(needTaskFlush.getAndSet(false)){
+        flushTaskHistories();
+      }
+      return histories;
     }
 
     private synchronized void writeQueryHistory(QueryHistory queryHistory) throws Exception
{
@@ -283,16 +357,10 @@ public class HistoryWriter extends AbstractService {
       FSDataOutputStream out = null;
       try {
         LOG.info("Saving query summary: " + queryHistoryFile);
-        out = fs.create(queryHistoryFile);
-        out.write(queryHistory.toJson().getBytes());
+        out = fs.create(queryHistoryFile, queryReplication);
+        out.write(queryHistory.toJson().getBytes(Bytes.UTF8_CHARSET));
       } finally {
-        if (out != null) {
-          try {
-            out.close();
-          } catch (Exception err) {
-            LOG.error(err.getMessage(), err);
-          }
-        }
+        IOUtils.cleanup(LOG, out);
       }
 
       if (queryHistory.getStageHistories() != null) {
@@ -300,24 +368,18 @@ public class HistoryWriter extends AbstractService {
           Path path = new Path(queryHistoryFile.getParent(), stageHistory.getExecutionBlockId()
+ HISTORY_FILE_POSTFIX);
           out = null;
           try {
-            out = fs.create(path);
-            out.write(stageHistory.toTasksJson().getBytes());
+            out = fs.create(path, queryReplication);
+            out.write(stageHistory.toTasksJson().getBytes(Bytes.UTF8_CHARSET));
             LOG.info("Saving query unit: " + path);
           } finally {
-            if (out != null) {
-              try {
-                out.close();
-              } catch (Exception err) {
-                LOG.error(err.getMessage(), err);
-              }
-            }
+            IOUtils.cleanup(LOG, out);
           }
         }
       }
     }
 
-    private synchronized void writeQuerySummary(QueryInfo queryInfo) throws Exception {
-      if(stopped.get()) return;
+    private synchronized WriterHolder writeQuerySummary(QueryInfo queryInfo) throws Exception
{
+      if(stopped.get()) return null;
 
         // writing to HDFS and rolling hourly
       if (querySummaryWriter == null) {
@@ -327,18 +389,21 @@ public class HistoryWriter extends AbstractService {
         if (querySummaryWriter.out == null) {
           rollingQuerySummaryWriter();
         } else if (System.currentTimeMillis() - querySummaryWriter.lastWritingTime >=
60 * 60 * 1000) {
-          if (querySummaryWriter.out != null) {
-            LOG.info("Close query history file: " + querySummaryWriter.path);
-            querySummaryWriter.out.close();
-          }
+          LOG.info("Close query history file: " + querySummaryWriter.path);
+          IOUtils.cleanup(LOG, querySummaryWriter);
           rollingQuerySummaryWriter();
         }
       }
-      byte[] jsonBytes = ("\n" + queryInfo.toJson() + "\n").getBytes();
-
-      querySummaryWriter.out.writeInt(jsonBytes.length);
-      querySummaryWriter.out.write(jsonBytes);
-      querySummaryWriter.out.hflush();
+      byte[] jsonBytes = ("\n" + queryInfo.toJson() + "\n").getBytes(Bytes.UTF8_CHARSET);
+      try {
+        querySummaryWriter.out.writeInt(jsonBytes.length);
+        querySummaryWriter.out.write(jsonBytes);
+      } catch (IOException ie) {
+        IOUtils.cleanup(LOG, querySummaryWriter);
+        querySummaryWriter.out = null;
+        throw ie;
+      }
+      return querySummaryWriter;
     }
 
     private synchronized void rollingQuerySummaryWriter() throws Exception {
@@ -359,10 +424,22 @@ public class HistoryWriter extends AbstractService {
       querySummaryWriter.path = historyFile;
       querySummaryWriter.lastWritingTime = System.currentTimeMillis();
       LOG.info("Create query history file: " + historyFile);
-      querySummaryWriter.out = fs.create(historyFile);
+      querySummaryWriter.out = fs.create(historyFile, queryReplication);
+    }
+
+    private void flushTaskHistories() {
+      synchronized (taskWriters) {
+        for (WriterHolder holder : taskWriters.values()) {
+          try {
+            holder.flush();
+          } catch (IOException e) {
+            LOG.warn(e);
+          }
+        }
+      }
     }
 
-    private synchronized void writeTaskHistory(TaskHistory taskHistory) throws Exception
{
+    private synchronized WriterHolder writeTaskHistory(TaskHistory taskHistory) throws Exception
{
       SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
 
       String taskStartTime = df.format(new Date(taskHistory.getStartTime()));
@@ -378,11 +455,17 @@ public class HistoryWriter extends AbstractService {
       writerHolder.lastWritingTime = System.currentTimeMillis();
 
       if (writerHolder.out != null) {
-        byte[] taskHistoryBytes = taskHistory.getProto().toByteArray();
-        writerHolder.out.writeInt(taskHistoryBytes.length);
-        writerHolder.out.write(taskHistoryBytes);
-        writerHolder.out.flush();
+        try {
+          byte[] taskHistoryBytes = taskHistory.getProto().toByteArray();
+          writerHolder.out.writeInt(taskHistoryBytes.length);
+          writerHolder.out.write(taskHistoryBytes);
+        } catch (IOException ie) {
+          taskWriters.remove(taskStartTime);
+          IOUtils.cleanup(LOG, writerHolder);
+          throw ie;
+        }
       }
+      return writerHolder;
     }
 
     private FSDataOutputStream createTaskHistoryFile(String taskStartTime, WriterHolder writerHolder)
throws IOException {
@@ -395,7 +478,7 @@ public class HistoryWriter extends AbstractService {
         }
       }
       writerHolder.path = path;
-      return fs.create(path, false);
+      return fs.create(path, false, 4096, taskReplication, fs.getDefaultBlockSize(path));
     }
   }
 
@@ -444,9 +527,89 @@ public class HistoryWriter extends AbstractService {
     return new Path(fileParent, processName + "_" + hour + "_" + maxSeq + HISTORY_FILE_POSTFIX);
   }
 
-  class WriterHolder {
+  static class WriterHolder implements Closeable {
     long lastWritingTime;
     Path path;
     FSDataOutputStream out;
+
+    @Override
+    public synchronized void close() throws IOException {
+      if (out != null) out.close();
+    }
+
+    /*
+     * Sync buffered data to DataNodes or disks (flush to disk devices).
+     */
+    private synchronized void flush() throws IOException {
+      if (out != null) out.hsync();
+    }
+  }
+
+  static class WriterFuture<T> implements Future<T> {
+    private boolean done = false;
+    private T result;
+    private History history;
+    private Throwable error;
+    private CountDownLatch latch = new CountDownLatch(1);
+
+    public WriterFuture(History history) {
+      this.history = history;
+    }
+
+    private History getHistory() {
+      return history;
+    }
+
+    public void done(T t) {
+      this.result = t;
+      this.done = true;
+      this.latch.countDown();
+    }
+
+    public void failed(Throwable e) {
+      this.error = e;
+      done(null);
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      // TODO - to be implemented
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isCancelled() {
+      // TODO - to be implemented
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isDone() {
+      return done;
+    }
+
+    public boolean isSucceed() {
+      return error == null;
+    }
+
+    public Throwable getError() {
+      return error;
+    }
+
+    @Override
+    public T get() throws InterruptedException {
+      this.latch.await();
+      return result;
+    }
+
+    @Override
+    public T get(long timeout, TimeUnit unit)
+        throws InterruptedException, TimeoutException {
+      if (latch.await(timeout, unit)) {
+        return result;
+      } else {
+        throw new TimeoutException();
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index f837b11..3c1fcc5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -192,6 +192,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
           TupleCache.getInstance().removeBroadcastCache(event.getExecutionBlockId());
           executionBlockContext.reportExecutionBlock(event.getExecutionBlockId());
           workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId());
+          workerContext.getTaskHistoryWriter().flushTaskHistories();
         } catch (IOException e) {
           LOG.fatal(e.getMessage(), e);
           throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 113288a..63ea8b9 100644
--- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -638,9 +638,8 @@ public class TestTajoClient {
     QueryId queryId = new QueryId(response.getQueryId());
 
     try {
-      long startTime = System.currentTimeMillis();
       while (true) {
-        Thread.sleep(5 * 1000);
+        Thread.sleep(100);
 
         List<ClientProtos.BriefQueryInfo> finishedQueries = client.getFinishedQueryList();
         boolean finished = false;
@@ -656,9 +655,6 @@ public class TestTajoClient {
         if (finished) {
           break;
         }
-        if(System.currentTimeMillis() - startTime > 20 * 1000) {
-          fail("Too long time execution query");
-        }
       }
 
       QueryStatus queryStatus = client.getQueryStatus(queryId);
@@ -755,7 +751,7 @@ public class TestTajoClient {
     assertEquals(expected, resultDatas);
   }
 
-  @Test
+  @Test(timeout = 30000)
   public void testGetQueryInfoAndHistory() throws Exception {
     String sql = "select count(*) from lineitem";
     ClientProtos.SubmitQueryResponse response = client.executeQuery(sql);
@@ -763,8 +759,7 @@ public class TestTajoClient {
     assertNotNull(response);
     QueryId queryId = new QueryId(response.getQueryId());
 
-    QueryInfoProto queryInfo = null;
-    long startTime = System.currentTimeMillis();
+    QueryInfoProto queryInfo;
     while (true) {
       queryInfo = client.getQueryInfo(queryId);
 
@@ -772,12 +767,7 @@ public class TestTajoClient {
         break;
       }
       Thread.sleep(100);
-
-      if (System.currentTimeMillis() - startTime > 30 * 1000) {
-        fail("Too long running query");
-      }
     }
-    Thread.sleep(5 * 1000);
 
     assertNotNull(queryInfo);
     assertEquals(queryId.toString(), queryInfo.getQueryId());

http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
index 45282aa..f442bde 100644
--- a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
+++ b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
@@ -41,7 +41,6 @@ import java.util.Date;
 import java.util.List;
 
 import static org.junit.Assert.*;
-import static org.junit.Assert.assertTrue;
 
 public class TestHistoryWriterReader extends QueryTestCaseBase {
   public static final String HISTORY_DIR = "/tmp/tajo-test-history";
@@ -71,17 +70,13 @@ public class TestHistoryWriterReader extends QueryTestCaseBase {
       queryInfo1.setStartTime(startTime);
       queryInfo1.setProgress(1.0f);
       queryInfo1.setQueryState(QueryState.QUERY_SUCCEEDED);
+      writer.appendHistory(queryInfo1);
 
       QueryInfo queryInfo2 = new QueryInfo(QueryIdFactory.newQueryId(startTime, 2));
       queryInfo2.setStartTime(startTime);
       queryInfo2.setProgress(0.5f);
       queryInfo2.setQueryState(QueryState.QUERY_FAILED);
-
-      writer.appendHistory(queryInfo1);
-      writer.appendHistory(queryInfo2);
-
-      // HistoryWriter writes asynchronous.
-      Thread.sleep(5 * 1000);
+      writer.appendAndSync(queryInfo2);
 
       SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
       Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
@@ -144,10 +139,7 @@ public class TestHistoryWriterReader extends QueryTestCaseBase {
       }
       queryHistory.setStageHistories(stages);
 
-      writer.appendHistory(queryHistory);
-
-      // HistoryWriter writes asynchronous.
-      Thread.sleep(5 * 1000);
+      writer.appendAndSync(queryHistory);
 
       SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
       Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
@@ -217,10 +209,7 @@ public class TestHistoryWriterReader extends QueryTestCaseBase {
       TaskAttemptId id2 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000002_00");
       org.apache.tajo.worker.TaskHistory taskHistory2 = new org.apache.tajo.worker.TaskHistory(
           id2, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis()
- 500, tableStats);
-      writer.appendHistory(taskHistory2);
-
-      // HistoryWriter writes asynchronous.
-      Thread.sleep(5 * 1000);
+      writer.appendAndSync(taskHistory2);
 
       SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
       String startDate = df.format(new Date(startTime));


Mime
View raw message