tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-1712: querytasks.jsp throws NPE occasionally when tasks are running. (jinho)
Date Wed, 29 Jul 2015 07:54:00 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 33f4b7a06 -> c46dc1a64


TAJO-1712: querytasks.jsp throws NPE occasionally when tasks are running. (jinho)

Closes #657


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c46dc1a6
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c46dc1a6
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c46dc1a6

Branch: refs/heads/master
Commit: c46dc1a64f9528adc5f2a54db3bc6aa247252b84
Parents: 33f4b7a
Author: Jinho Kim <jhkim@apache.org>
Authored: Wed Jul 29 16:53:11 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Wed Jul 29 16:53:11 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../org/apache/tajo/master/QueryManager.java    |  44 ++++++--
 .../main/java/org/apache/tajo/util/JSPUtil.java |  19 ++++
 .../apache/tajo/util/history/HistoryReader.java | 100 ++++++++++++-------
 .../src/main/resources/webapps/admin/query.jsp  |  13 +--
 .../resources/webapps/worker/querytasks.jsp     |  18 ++--
 .../util/history/TestHistoryWriterReader.java   |  12 ++-
 7 files changed, 152 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 721f8ff..1389b6f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -200,6 +200,9 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1712: querytasks.jsp throws NPE occasionally when tasks are running.
+    (jinho)
+
     TAJO-1716: Repartitioner.makeEvenDistributedFetchImpl() does not distribute 
     fetches evenly. (jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index 2578c9f..8be298e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.collections.map.LRUMap;
@@ -44,10 +45,7 @@ import org.apache.tajo.session.Session;
 import org.apache.tajo.util.history.HistoryReader;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -119,17 +117,51 @@ public class QueryManager extends CompositeService {
     return Collections.unmodifiableCollection(runningQueries.values());
   }
 
+  @Deprecated
   public Collection<QueryInfo> getFinishedQueries() {
     Set<QueryInfo> result = Sets.newTreeSet();
+
     synchronized (historyCache) {
       result.addAll(historyCache.values());
     }
 
     try {
       synchronized (this) {
-        result.addAll(this.masterContext.getHistoryReader().getQueries(null));
+        result.addAll(this.masterContext.getHistoryReader().getQueriesInHistory());
+      }
+      return result;
+    } catch (Throwable e) {
+      LOG.error(e, e);
+      return result;
+    }
+  }
+
+  /**
+   * Get query history in cache or persistent storage
+   */
+  public Collection<QueryInfo> getFinishedQueries(int page, int size) {
+    TreeSet<QueryInfo> result = Sets.newTreeSet();
+    if(page <= 0 || size <= 0) {
+      return result;
+    }
+
+    List<QueryInfo> cacheList = Lists.newArrayList();
+    synchronized (historyCache) {
+      // request size fits in cache
+      if (page == 1 && size <= historyCache.size()) {
+        cacheList.addAll(historyCache.values());
       }
+    }
+
+    if (cacheList.size() > 0) {
+      result.addAll(cacheList.subList(0, size));
       return result;
+    }
+
+    try {
+      synchronized (this) {
+        return this.masterContext.getHistoryReader().getQueriesInHistory(page, size);
+      }
     } catch (Throwable e) {
       LOG.error(e, e);
       return result;
@@ -144,7 +176,7 @@ public class QueryManager extends CompositeService {
       }
       if (queryInfo == null) {
         synchronized (this) {
-          queryInfo = this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
+          queryInfo = this.masterContext.getHistoryReader().getQueryByQueryId(queryId);
         }
       }
       return queryInfo;

http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 7641320..1d93c3c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -355,6 +355,25 @@ public class JSPUtil {
     return sb.toString();
   }
 
+  public static String getPageNavigation(int currentPage, boolean next, String url) {
+    StringBuilder sb = new StringBuilder();
+    if (currentPage > 1) {
+      sb.append("<a href='").append(url)
+          .append("&page=").append(currentPage - 1).append("'>")
+          .append("&lt;prev</a>");
+      sb.append("&nbsp;&nbsp;");
+    }
+
+    sb.append(currentPage);
+
+    if(next) {
+      sb.append("&nbsp;&nbsp;").append("<a href='").append(url)
+          .append("&page=").append(currentPage + 1).append("'>")
+          .append("next&gt;</a>");
+    }
+    return sb.toString();
+  }
+
   public static <T extends Object> List<T> getPageNavigationList(List<T>
originList, int page, int pageSize) {
     if (originList == null) {
       return new ArrayList<T>();

http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
index 2acd5f6..27d823e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.util.history;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -25,9 +26,11 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.ResourceProtos.TaskHistoryProto;
 import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ResourceProtos.TaskHistoryProto;
 import org.apache.tajo.master.QueryInfo;
 import org.apache.tajo.util.Bytes;
 
@@ -54,7 +57,28 @@ public class HistoryReader {
     taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf);
   }
 
-  public List<QueryInfo> getQueries(String keyword) throws IOException {
+  @Deprecated
+  public List<QueryInfo> getQueriesInHistory() throws IOException {
+    return getQueriesInHistory(-1, Integer.MAX_VALUE);
+  }
+
+  public List<QueryInfo> getQueriesInHistory(int page, int size) throws IOException
{
+    List<QueryInfo> queryList = getQueryInfoInHistory(page, size, null);
+    if (queryList.size() > size) {
+      queryList = queryList.subList(0, size);
+    }
+
+    Collections.sort(queryList, new Comparator<QueryInfo>() {
+      @Override
+      public int compare(QueryInfo query1, QueryInfo query2) {
+        return query2.compareTo(query1);
+      }
+    });
+
+    return queryList;
+  }
+
+  private List<QueryInfo> getQueryInfoInHistory(int page, int size, @Nullable QueryId
queryId) throws IOException {
     List<QueryInfo> queryInfos = new ArrayList<QueryInfo>();
 
     FileSystem fs = HistoryWriter.getNonCrcFileSystem(historyParentPath, tajoConf);
@@ -62,7 +86,7 @@ public class HistoryReader {
       if (!fs.exists(historyParentPath)) {
         return queryInfos;
       }
-    } catch (Throwable e){
+    } catch (Throwable e) {
       return queryInfos;
     }
 
@@ -71,7 +95,11 @@ public class HistoryReader {
       return queryInfos;
     }
 
-    for (FileStatus eachDateFile: files) {
+    int startIndex = page < 1 ? page : (page - 1) * size; // set index to last index of
previous page
+    int currentIndex = 0;
+
+    ArrayUtils.reverse(files);
+    for (FileStatus eachDateFile : files) {
       Path queryListPath = new Path(eachDateFile.getPath(), HistoryWriter.QUERY_LIST);
       if (eachDateFile.isFile() || !fs.exists(queryListPath)) {
         continue;
@@ -82,52 +110,70 @@ public class HistoryReader {
         continue;
       }
 
-      for (FileStatus eachFile: dateFiles) {
+      ArrayUtils.reverse(dateFiles);
+      for (FileStatus eachFile : dateFiles) {
         Path path = eachFile.getPath();
         if (eachFile.isDirectory() || !path.getName().endsWith(HistoryWriter.HISTORY_FILE_POSTFIX))
{
           continue;
         }
 
         FSDataInputStream in = null;
+        long totalLength = 0;
+
         try {
           in = fs.open(path);
 
-          byte[] buf = new byte[100 * 1024];
-          while (true) {
+          while (totalLength < eachFile.getLen()) {
             int length = in.readInt();
-            if (length > buf.length) {
-              buf = new byte[length];
+            totalLength += 4;
+
+            currentIndex++;
+            //skip previous page
+            if (startIndex >= currentIndex) {
+              totalLength += in.skipBytes(length);
+              continue;
             }
+
+            byte[] buf = new byte[length];
             in.readFully(buf, 0, length);
+            totalLength += length;
+
             String queryInfoJson = new String(buf, 0, length, Bytes.UTF8_CHARSET);
             QueryInfo queryInfo = QueryInfo.fromJson(queryInfoJson);
-            if (keyword != null) {
-              if (queryInfo.getSql().indexOf(keyword) >= 0) {
+
+            if (queryId != null) {
+              if (queryInfo.getQueryId().equals(queryId)) {
                 queryInfos.add(queryInfo);
+                return queryInfos;
               }
             } else {
               queryInfos.add(queryInfo);
             }
           }
-        } catch (EOFException e) {
         } catch (Throwable e) {
-          LOG.warn("Reading error:" + path + ", " +e.getMessage());
+          LOG.warn("Reading error:" + path + ", " + e.getMessage());
         } finally {
           IOUtils.cleanup(LOG, in);
         }
-      }
-    }
 
-    Collections.sort(queryInfos, new Comparator<QueryInfo>() {
-      @Override
-      public int compare(QueryInfo query1, QueryInfo query2) {
-        return query2.getQueryIdStr().toString().compareTo(query1.getQueryIdStr().toString());
+        if (queryInfos.size() >= size) {
+          return queryInfos;
+        }
       }
-    });
+    }
 
     return queryInfos;
   }
 
+  public QueryInfo getQueryByQueryId(QueryId queryId) throws IOException {
+    List<QueryInfo> queryInfoList = getQueryInfoInHistory(-1, Integer.MAX_VALUE, queryId);
+    if (queryInfoList.size() > 0) {
+      return queryInfoList.get(0);
+    } else {
+      return null;
+    }
+  }
+
   private Path getQueryHistoryFilePath(String queryId, long startTime) throws IOException
{
     if (startTime == 0) {
       String[] tokens = queryId.split("_");
@@ -298,18 +344,4 @@ public class HistoryReader {
     }
     return null;
   }
-
-  public QueryInfo getQueryInfo(String queryId) throws IOException {
-    List<QueryInfo> queries = getQueries(null);
-
-    if (queries != null) {
-      for (QueryInfo queryInfo: queries) {
-        if (queryId.equals(queryInfo.getQueryId().toString())) {
-          return queryInfo;
-        }
-      }
-    }
-
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 0701e34..7422a03 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -59,14 +59,9 @@
     }
   }
 
-  List<QueryInfo> allFinishedQueries = new ArrayList<QueryInfo>(master.getContext().getQueryJobManager().getFinishedQueries());
-  Collections.sort(allFinishedQueries, java.util.Collections.reverseOrder());
-
-  int numOfFinishedQueries = allFinishedQueries.size();
-  int totalPage = numOfFinishedQueries % pageSize == 0 ?
-      numOfFinishedQueries / pageSize : numOfFinishedQueries / pageSize + 1;
-
-  List<QueryInfo> finishedQueries = JSPUtil.getPageNavigationList(allFinishedQueries,
currentPage, pageSize);
+  List<QueryInfo> finishedQueries = new ArrayList<QueryInfo>(
+          master.getContext().getQueryJobManager().getFinishedQueries(currentPage, pageSize));
+  Collections.sort(finishedQueries, java.util.Collections.reverseOrder());
 
   SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
@@ -220,7 +215,7 @@
     %>
   </table>
   <div align="center">
-    <%=JSPUtil.getPageNavigation(currentPage, totalPage, "query.jsp?pageSize=" + pageSize)%>
+      <%=JSPUtil.getPageNavigation(currentPage, finishedQueries.size() == pageSize, "query.jsp?pageSize="
+ pageSize)%>
   </div>
   <p/>
 <%

http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
index 2c32006..caaec35 100644
--- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
@@ -111,16 +111,18 @@
 
   float totalProgress = 0.0f;
   for(Task eachTask : allTasks) {
-    totalProgress += eachTask.getLastAttempt() != null ? eachTask.getLastAttempt().getProgress():
0.0f;
+
     numShuffles = eachTask.getShuffleOutpuNum();
-    if (eachTask.getLastAttempt() != null) {
-      TableStats inputStats = eachTask.getLastAttempt().getInputStats();
+    TaskAttempt lastAttempt = eachTask.getLastAttempt();
+    if (lastAttempt != null) {
+      totalProgress +=  lastAttempt.getProgress();
+      TableStats inputStats = lastAttempt.getInputStats();
       if (inputStats != null) {
         totalInputBytes += inputStats.getNumBytes();
         totalReadBytes += inputStats.getReadBytes();
         totalReadRows += inputStats.getNumRows();
       }
-      TableStats outputStats = eachTask.getLastAttempt().getResultStats();
+      TableStats outputStats = lastAttempt.getResultStats();
       if (outputStats != null) {
         totalWriteBytes += outputStats.getNumBytes();
         totalWriteRows += outputStats.getNumRows();
@@ -231,18 +233,20 @@
             "&taskSeq=" + taskSeq + "&sort=" + sort + "&sortOrder=" + sortOrder;
 
     TaskAttempt lastAttempt = eachTask.getLastAttempt();
-    String taskHost = lastAttempt == null ? "-" : lastAttempt.getWorkerConnectionInfo().getHost();
-    if(lastAttempt != null) {
+    String taskHost = "-";
+    float progress = 0.0f;
+    if(lastAttempt != null && lastAttempt.getWorkerConnectionInfo() != null) {
       WorkerConnectionInfo conn = lastAttempt.getWorkerConnectionInfo();
       TaskAttemptId lastAttemptId = lastAttempt.getId();
       taskHost = "<a href='http://" + conn.getHost() + ":" + conn.getHttpInfoPort() +
"/taskdetail.jsp?taskAttemptId=" + lastAttemptId + "'>" + conn.getHost() + "</a>";
+      progress = eachTask.getLastAttempt().getProgress();
     }
 %>
     <tr>
       <td align='center'><%=rowNo%></td>
       <td><a href="<%=taskDetailUrl%>"><%=eachTask.getId()%></a></td>
       <td align='center'><%=eachTask.getLastAttemptStatus()%></td>
-      <td align='center'><%=JSPUtil.percentFormat(eachTask.getLastAttempt().getProgress())%>%</td>
+      <td align='center'><%=JSPUtil.percentFormat(progress)%>%</td>
       <td align='center'><%=eachTask.getLaunchTime() == 0 ? "-" : df.format(eachTask.getLaunchTime())%></td>
       <td align='right'><%=eachTask.getLaunchTime() == 0 ? "-" : eachTask.getRunningTime()
+ " ms"%></td>
       <td align='center'><%=eachTask.getRetryCount()%></td>

http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
index f442bde..aee418c 100644
--- a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
+++ b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
@@ -90,7 +90,7 @@ public class TestHistoryWriterReader extends QueryTestCaseBase {
       assertTrue(histFiles[0].getPath().getName().endsWith(".hist"));
 
       HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
-      List<QueryInfo> queryInfos = reader.getQueries(null);
+      List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, 2);
       assertNotNull(queryInfos);
       assertEquals(2, queryInfos.size());
 
@@ -99,10 +99,20 @@ public class TestHistoryWriterReader extends QueryTestCaseBase {
       assertEquals(queryInfo2.getQueryState(), foundQueryInfo.getQueryState());
       assertEquals(queryInfo2.getProgress(), foundQueryInfo.getProgress(), 0);
 
+      foundQueryInfo = reader.getQueryByQueryId(queryInfo2.getQueryId());
+      assertEquals(queryInfo2.getQueryId(), foundQueryInfo.getQueryId());
+      assertEquals(queryInfo2.getQueryState(), foundQueryInfo.getQueryState());
+      assertEquals(queryInfo2.getProgress(), foundQueryInfo.getProgress(), 0);
+
       foundQueryInfo = queryInfos.get(1);
       assertEquals(queryInfo1.getQueryId(), foundQueryInfo.getQueryId());
       assertEquals(queryInfo1.getQueryState(), foundQueryInfo.getQueryState());
       assertEquals(queryInfo1.getProgress(), foundQueryInfo.getProgress(), 0);
+
+      foundQueryInfo = reader.getQueryByQueryId(queryInfo1.getQueryId());
+      assertEquals(queryInfo1.getQueryId(), foundQueryInfo.getQueryId());
+      assertEquals(queryInfo1.getQueryState(), foundQueryInfo.getQueryState());
+      assertEquals(queryInfo1.getProgress(), foundQueryInfo.getProgress(), 0);
     } finally {
       writer.stop();
     }


Mime
View raw message