tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-1205: Remove possible memory leak in TajoMaster. (jinho)
Date Fri, 21 Nov 2014 07:24:08 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 965cbd907 -> 80afe993b


TAJO-1205: Remove possible memory leak in TajoMaster. (jinho)

Closes #265


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/80afe993
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/80afe993
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/80afe993

Branch: refs/heads/master
Commit: 80afe993b82d57582fbeab64d20199f4dfa3d9af
Parents: 965cbd9
Author: jhkim <jhkim@apache.org>
Authored: Fri Nov 21 16:23:32 2014 +0900
Committer: jhkim <jhkim@apache.org>
Committed: Fri Nov 21 16:23:32 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../tajo/master/TajoMasterClientService.java    | 32 ++++-----
 .../master/querymaster/QueryJobManager.java     | 70 ++++++++++++++++----
 .../master/querymaster/QueryMasterTask.java     |  1 +
 .../tajo/webapp/QueryExecutorServlet.java       | 24 +++++--
 .../src/main/resources/webapps/admin/index.jsp  | 42 ++----------
 .../resources/webapps/admin/query_executor.jsp  |  4 +-
 .../org/apache/tajo/worker/TestHistory.java     |  6 +-
 8 files changed, 107 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 42ab10a..d5c7f1c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -63,6 +63,8 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1205: Remove possible memory leak in TajoMaster. (jinho)
+
     TAJO-1181: Avro schema URL should support various protocols. 
     (jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 2c81cd0..540bd71 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -28,7 +28,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.tajo.*;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.NoSuchDatabaseException;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
@@ -57,8 +60,6 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.history.QueryHistory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -319,22 +320,23 @@ public class TajoMasterClientService extends AbstractService {
 
         // if we cannot get a QueryInProgress instance from QueryJobManager,
         // the instance can be in the finished query list.
+        QueryInfo queryInfo = null;
         if (queryInProgress == null) {
-          queryInProgress = context.getQueryJobManager().getFinishedQuery(queryId);
+          queryInfo = context.getQueryJobManager().getFinishedQuery(queryId);
+        } else {
+          queryInfo = queryInProgress.getQueryInfo();
         }
 
         GetQueryResultResponse.Builder builder = GetQueryResultResponse.newBuilder();
 
-        // If we cannot the QueryInProgress instance from the finished list,
+        // If we cannot the QueryInfo instance from the finished list,
         // the query result was expired due to timeout.
         // In this case, we will result in error.
-        if (queryInProgress == null) {
+        if (queryInfo == null) {
           builder.setErrorMessage("No such query: " + queryId.toString());
           return builder.build();
         }
 
-        QueryInfo queryInfo = queryInProgress.getQueryInfo();
-
         try {
           //TODO After implementation Tajo's user security feature, Should be modified.
           builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
@@ -404,14 +406,12 @@ public class TajoMasterClientService extends AbstractService {
         context.getSessionManager().touch(request.getSessionId().getId());
         GetQueryListResponse.Builder builder = GetQueryListResponse.newBuilder();
 
-        Collection<QueryInProgress> queries
+        Collection<QueryInfo> queries
             = context.getQueryJobManager().getFinishedQueries();
 
         BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
 
-        for (QueryInProgress queryInProgress : queries) {
-          QueryInfo queryInfo = queryInProgress.getQueryInfo();
-
+        for (QueryInfo queryInfo : queries) {
           infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
           infoBuilder.setState(queryInfo.getQueryState());
           infoBuilder.setQuery(queryInfo.getSql());
@@ -452,12 +452,14 @@ public class TajoMasterClientService extends AbstractService {
           QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
 
           // It will try to find a query status from a finished query list.
+          QueryInfo queryInfo = null;
           if (queryInProgress == null) {
-            queryInProgress = context.getQueryJobManager().getFinishedQuery(queryId);
+            queryInfo = context.getQueryJobManager().getFinishedQuery(queryId);
+          } else {
+            queryInfo = queryInProgress.getQueryInfo();
           }
 
-          if (queryInProgress != null) {
-            QueryInfo queryInfo = queryInProgress.getQueryInfo();
+          if (queryInfo != null) {
             builder.setResultCode(ResultCode.OK);
             builder.setState(queryInfo.getQueryState());
             builder.setProgress(queryInfo.getProgress());

http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index bcca039..536f6ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master.querymaster;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -27,18 +28,19 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoProtos;
-import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.scheduler.SimpleFifoScheduler;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class QueryJobManager extends CompositeService {
   private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
@@ -54,7 +56,10 @@ public class QueryJobManager extends CompositeService {
 
   private final Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId,
QueryInProgress>();
 
-  private final Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId,
QueryInProgress>();
+  private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
+  private AtomicLong maxExecutionTime = new AtomicLong();
+  private AtomicLong avgExecutionTime = new AtomicLong();
+  private AtomicLong executedQuerySize = new AtomicLong();
 
   public QueryJobManager(final TajoMaster.MasterContext masterContext) {
     super(QueryJobManager.class.getName());
@@ -110,9 +115,22 @@ public class QueryJobManager extends CompositeService {
     }
   }
 
-  public Collection<QueryInProgress> getFinishedQueries() {
-    synchronized (finishedQueries){
-      return Collections.unmodifiableCollection(finishedQueries.values());
+  public synchronized Collection<QueryInfo> getFinishedQueries() {
+    try {
+      return this.masterContext.getHistoryReader().getQueries(null);
+    } catch (Throwable e) {
+      LOG.error(e);
+      return Lists.newArrayList();
+    }
+  }
+
+
+  public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
+    try {
+      return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
+    } catch (Throwable e) {
+      LOG.error(e);
+      return null;
     }
   }
 
@@ -194,12 +212,6 @@ public class QueryJobManager extends CompositeService {
     return queryInProgress;
   }
 
-  public QueryInProgress getFinishedQuery(QueryId queryId) {
-    synchronized(finishedQueries) {
-      return finishedQueries.get(queryId);
-    }
-  }
-
   public void stopQuery(QueryId queryId) {
     LOG.info("Stop QueryInProgress:" + queryId);
     QueryInProgress queryInProgress = getQueryInProgress(queryId);
@@ -213,14 +225,46 @@ public class QueryJobManager extends CompositeService {
         runningQueries.remove(queryId);
       }
 
-      synchronized(finishedQueries) {
-        finishedQueries.put(queryId, queryInProgress);
+      QueryInfo queryInfo = queryInProgress.getQueryInfo();
+      long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
+      if (executionTime < minExecutionTime.get()) {
+        minExecutionTime.set(executionTime);
       }
+
+      if (executionTime > maxExecutionTime.get()) {
+        maxExecutionTime.set(executionTime);
+      }
+
+      long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
+      if (totalExecutionTime > 0) {
+        avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get()
+ 1));
+      } else {
+        avgExecutionTime.set(executionTime);
+      }
+      executedQuerySize.incrementAndGet();
+      removeService(queryInProgress);
     } else {
       LOG.warn("No QueryInProgress while query stopping: " + queryId);
     }
   }
 
+  public long getMinExecutionTime() {
+    if (getExecutedQuerySize() == 0) return 0;
+    return minExecutionTime.get();
+  }
+
+  public long getMaxExecutionTime() {
+    return maxExecutionTime.get();
+  }
+
+  public long getAvgExecutionTime() {
+    return avgExecutionTime.get();
+  }
+
+  public long getExecutedQuerySize() {
+    return executedQuerySize.get();
+  }
+
   private void catchException(QueryId queryId, Exception e) {
     LOG.error(e.getMessage(), e);
     QueryInProgress queryInProgress = runningQueries.get(queryId);

http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 8ba9600..5cf3df5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -354,6 +354,7 @@ public class QueryMasterTask extends CompositeService {
       LogicalPlanner planner = new LogicalPlanner(catalog);
       LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
       Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
+      jsonExpr = null; // remove the possible OOM
       LogicalPlan plan = planner.createPlan(queryContext, expr);
       optimizer.optimize(queryContext, plan);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index 23311ac..0075b04 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -29,10 +29,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,6 +59,7 @@ public class QueryExecutorServlet extends HttpServlet {
   ObjectMapper om = new ObjectMapper();
 
   //queryRunnerId -> QueryRunner
+  //TODO We must handle the session.
   private final Map<String, QueryRunner> queryRunners = new HashMap<String, QueryRunner>();
 
   private TajoConf tajoConf;
@@ -100,11 +98,29 @@ public class QueryExecutorServlet extends HttpServlet {
       }
 
       if("runQuery".equals(action)) {
+        String prevQueryRunnerId = request.getParameter("prevQueryId");
+        if (prevQueryRunnerId != null) {
+          synchronized (queryRunners) {
+            QueryRunner runner = queryRunners.remove(prevQueryRunnerId);
+            if (runner != null) runner.setStop();
+          }
+        }
+
+        float allowedMemoryRatio = 0.5f; // if TajoMaster memory usage is over 50%, the request
will be canceled
+        long maxMemory = Runtime.getRuntime().maxMemory();
+        long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+        if(usedMemory > maxMemory * allowedMemoryRatio) {
+          errorResponse(response, "Allowed memory size of " +
+              (maxMemory * allowedMemoryRatio) / (1024 * 1024) + " MB exhausted");
+          return;
+        }
+
         String query = request.getParameter("query");
         if(query == null || query.trim().isEmpty()) {
           errorResponse(response, "No query parameter");
           return;
         }
+
         String queryRunnerId = null;
         while(true) {
           synchronized(queryRunners) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index ce4d7dc..30cbf88 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -85,38 +85,6 @@
   String numDeadWorkersHtml = numDeadWorkers == 0 ? "0" : "<font color='red'>" + numDeadWorkers
+ "</font>";
   String numDeadQueryMastersHtml = numDeadQueryMasters == 0 ? "0" : "<font color='red'>"
+ numDeadQueryMasters + "</font>";
 
-  Collection<QueryInProgress> runningQueries = master.getContext().getQueryJobManager().getRunningQueries();
-  Collection<QueryInProgress> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
-
-  int avgQueryTime = 0;
-  int minQueryTime = Integer.MAX_VALUE;
-  int maxQueryTime = 0;
-
-  long totalTime = 0;
-  for(QueryInProgress eachQuery: finishedQueries) {
-    int runTime = (int)(eachQuery.getQueryInfo().getFinishTime() == 0 ? -1 :
-            eachQuery.getQueryInfo().getFinishTime() - eachQuery.getQueryInfo().getStartTime());
-    if(runTime > 0) {
-      totalTime += runTime;
-
-      if(runTime < minQueryTime) {
-        minQueryTime = runTime;
-      }
-
-      if(runTime > maxQueryTime) {
-        maxQueryTime = runTime;
-      }
-    }
-  }
-
-  if(minQueryTime == Integer.MAX_VALUE) {
-    minQueryTime = 0;
-  }
-  if(finishedQueries.size() > 0) {
-    avgQueryTime = (int)(totalTime / (long)finishedQueries.size());
-  }
-
-
   HAService haService = master.getContext().getHAService();
   List<TajoMasterInfo> masters = TUtil.newList();
 
@@ -217,11 +185,11 @@
   <table width="100%" class="border_table" border="1">
     <tr><th>Running Queries</th><th>Finished Queries</th><th>Average
Execution Time</th><th>Min. Execution Time</th><th>Max. Execution
Time</th></tr>
     <tr>
-      <td align='right'><%=runningQueries.size()%></td>
-      <td align='right'><%=finishedQueries.size()%></td>
-      <td align='left'><%=avgQueryTime/1000%> sec</td>
-      <td align='left'><%=minQueryTime/1000%> sec</td>
-      <td align='left'><%=maxQueryTime/1000%> sec</td>
+      <td align='right'><%=master.getContext().getQueryJobManager().getRunningQueries().size()%></td>
+      <td align='right'><%=master.getContext().getQueryJobManager().getExecutedQuerySize()
%></td>
+      <td align='left'><%=master.getContext().getQueryJobManager().getAvgExecutionTime()/1000%>
sec</td>
+      <td align='left'><%=master.getContext().getQueryJobManager().getMinExecutionTime()/1000%>
sec</td>
+      <td align='left'><%=master.getContext().getQueryJobManager().getMaxExecutionTime()/1000%>
sec</td>
     </tr>
   </table>
 </div>

http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
index c46fcb1..bbd1820 100644
--- a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
@@ -96,7 +96,7 @@ function runQuery() {
   $.ajax({
     type: "POST",
     url: "query_exec",
-    data: { action: "runQuery", query: query, limitSize:SIZE_LIMIT, database: sbox.options[sbox.selectedIndex].text
}
+    data: { action: "runQuery", query: query, prevQueryId: queryRunnerId, limitSize:SIZE_LIMIT,
database: sbox.options[sbox.selectedIndex].text }
   })
   .done(function(msg) {
     var resultJson = $.parseJSON(msg);
@@ -329,7 +329,7 @@ function getPage() {
   <hr/>
   <div id="queryResultTools"></div>
   <hr/>
-  <div style="dispaly:none;"><form name="dataForm" id="dataForm" method="post" action="getCSV.jsp"><input
type="hidden" id="csvData" name="csvData" value="" /></div>
+  <div style="display:none;"><form name="dataForm" id="dataForm" method="post" action="getCSV.jsp"><input
type="hidden" id="csvData" name="csvData" value="" /></div>
 </div>
 </body>
 </html>

http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
index c68d3a4..fa90b61 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
@@ -28,7 +28,7 @@ import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.master.querymaster.QueryInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -64,7 +64,7 @@ public class TestHistory {
     int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size();
     client.executeQueryAndGetResult("select sleep(1) from lineitem");
 
-    Collection<QueryInProgress> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
+    Collection<QueryInfo> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
     assertTrue(finishedQueries.size() > beforeFinishedQueriesCount);
 
     TajoWorker worker = cluster.getTajoWorkers().get(0);
@@ -91,7 +91,7 @@ public class TestHistory {
     int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size();
     client.executeQueryAndGetResult("select sleep(1) from lineitem");
 
-    Collection<QueryInProgress> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
+    Collection<QueryInfo> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
     assertTrue(finishedQueries.size() > beforeFinishedQueriesCount);
 
     TajoWorker worker = cluster.getTajoWorkers().get(0);


Mime
View raw message