tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject git commit: TAJO-250: QueryMaster must send the query finish heartbeat. (hyunsik)
Date Sun, 13 Oct 2013 14:54:58 GMT
Updated Branches:
  refs/heads/master 2bbc3064c -> 4dcaffd3c


TAJO-250: QueryMaster must send the query finish heartbeat. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/4dcaffd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/4dcaffd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/4dcaffd3

Branch: refs/heads/master
Commit: 4dcaffd3c8d1da081c3943af0e89d225f38832f5
Parents: 2bbc306
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Sun Oct 13 23:54:17 2013 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Sun Oct 13 23:54:39 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../java/org/apache/tajo/util/StringUtils.java  | 57 ++++++++++++++++++++
 .../apache/tajo/master/querymaster/Query.java   |  3 +-
 .../tajo/master/querymaster/QueryInfo.java      |  2 +-
 .../master/querymaster/QueryJobManager.java     |  5 +-
 .../tajo/master/querymaster/QueryMaster.java    | 40 +++++++++-----
 .../src/main/resources/webapps/admin/query.jsp  | 10 ++--
 7 files changed, 99 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4dcaffd3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0dcb42d..2967daf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -197,6 +197,8 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
+    TAJO-250: QueryMaster must send the query finish heartbeat. (hyunsik)
+
     TAJO-245: org.apache.tajo.algebra.FunctionExpr cannot be cast to 
     org.apache.tajo.algebra.ColumnReferenceExpr. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4dcaffd3/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java
new file mode 100644
index 0000000..b5aa61c
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+public class StringUtils {
+  /**
+   *
+   * Given the time in long milliseconds, returns a
+   * String in the format X hrs, Y mins, S sec, M msecs
+   *
+   * @param timeDiff The time difference to format
+   */
+  public static String formatTime(long timeDiff){
+    StringBuilder buf = new StringBuilder();
+    long hours = timeDiff / (60*60*1000);
+    long rem = (timeDiff % (60*60*1000));
+    long minutes =  rem / (60*1000);
+    rem = rem % (60*1000);
+    long seconds = rem / 1000;
+
+    if (hours != 0){
+      buf.append(hours);
+      buf.append(" hrs, ");
+    }
+    if (minutes != 0){
+      buf.append(minutes);
+      buf.append(" mins, ");
+    }
+
+    if (seconds != 0) {
+      buf.append(seconds);
+      buf.append(" sec");
+    }
+
+    if (timeDiff < 1000) {
+      buf.append(timeDiff);
+      buf.append(" msec");
+    }
+    return buf.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4dcaffd3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index af49457..38fbfe2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -328,10 +328,11 @@ public class Query implements EventHandler<QueryEvent> {
               catalog.addTable(finalTableDesc);
             }
             query.setResultDesc(finalTableDesc);
+            query.finished(QueryState.QUERY_SUCCEEDED);
             query.eventHandler.handle(new QueryFinishEvent(query.getId()));
           }
 
-          return query.finished(QueryState.QUERY_SUCCEEDED);
+          return QueryState.QUERY_SUCCEEDED;
         }
       } else {
         // if at least one subquery is failed, the query is also failed.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4dcaffd3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index 89fc6fe..89084d1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -122,6 +122,6 @@ public class QueryInfo {
 
   @Override
   public String toString() {
-    return queryId.toString() + ", queryMaster=" + queryMasterResource;
+    return queryId.toString() + "state=" + queryState +", queryMaster=" + queryMasterResource;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4dcaffd3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 669dee3..75a58bc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -174,12 +174,15 @@ public class QueryJobManager extends CompositeService {
       queryMasterResource.setManagerPort(queryHeartbeat.getTajoWorkerPort());
       queryMasterResource.setClientPort(queryHeartbeat.getTajoWorkerClientPort());
       queryMasterResource.setPullServerPort(queryHeartbeat.getTajoWorkerPullServerPort());
-
       queryInfo.setQueryMasterResource(queryMasterResource);
     }
     queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
     queryInfo.setQueryState(queryHeartbeat.getState());
 
+    if (queryHeartbeat.hasQueryFinishTime()) {
+      queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
+    }
+
     return queryInfo;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4dcaffd3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index d32d7ce..d41a46f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -30,19 +30,22 @@ import org.apache.hadoop.yarn.service.Service;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.GlobalPlanner;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.QueryStartEvent;
+import org.apache.tajo.rpc.CallFuture2;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.worker.TajoWorker;
 
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
+
 // TODO - when exception, send error status to QueryJobManager
 public class QueryMaster extends CompositeService implements EventHandler {
   private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
@@ -155,7 +158,7 @@ public class QueryMaster extends CompositeService implements EventHandler
{
   public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state)
{
     LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
     try {
-      TajoMasterProtocol.TajoHeartbeat.Builder queryHeartbeatBuilder = TajoMasterProtocol.TajoHeartbeat.newBuilder()
+      TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
           .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
           .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
           .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
@@ -262,6 +265,15 @@ public class QueryMaster extends CompositeService implements EventHandler
{
         } catch (Exception e) {
           LOG.error(e.getMessage(), e);
         }
+
+        TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
+        CallFuture2 futuer = new CallFuture2();
+        workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, futuer);
+        try {
+          futuer.get(3000, TimeUnit.SECONDS);
+        } catch (Throwable e) {
+          LOG.warn(e);
+        }
         finishedQueryMasterTasks.put(queryId, queryMasterTask);
       } else {
         LOG.warn("No query info:" + queryId);
@@ -272,6 +284,19 @@ public class QueryMaster extends CompositeService implements EventHandler
{
     }
   }
 
+  private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
+    TajoHeartbeat queryHeartbeat = TajoHeartbeat.newBuilder()
+        .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
+        .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
+        .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+        .setState(queryMasterTask.getState())
+        .setQueryId(queryMasterTask.getQueryId().getProto())
+        .setQueryProgress(queryMasterTask.getQuery().getProgress())
+        .setQueryFinishTime(queryMasterTask.getQuery().getFinishTime())
+        .build();
+    return queryHeartbeat;
+  }
+
   private class QueryStartEventHandler implements EventHandler<QueryStartEvent> {
     @Override
     public void handle(QueryStartEvent event) {
@@ -303,16 +328,7 @@ public class QueryMaster extends CompositeService implements EventHandler
{
         synchronized(queryMasterTasks) {
           for(QueryMasterTask eachTask: tempTasks) {
             try {
-              TajoMasterProtocol.TajoHeartbeat queryHeartbeat = TajoMasterProtocol.TajoHeartbeat.newBuilder()
-                  .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
-                  .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
-                  .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
-                  .setState(eachTask.getState())
-                  .setQueryId(eachTask.getQueryId().getProto())
-                  .setQueryProgress(eachTask.getQuery().getProgress())
-                  .setQueryFinishTime(eachTask.getQuery().getFinishTime())
-                  .build();
-
+              TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
               workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, NullCallback.get());
             } catch (Throwable t) {
               t.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4dcaffd3/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
index 7ca6055..7a5fdee 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
@@ -52,7 +52,7 @@
       <td><%=eachQuery.getQueryInfo().getQueryMasterHost()%></td>
       <td><%=df.format(eachQuery.getQueryInfo().getStartTime())%></td>
       <td><%=(int)(eachQuery.getQueryInfo().getProgress() * 100.0f)%>%</td>
-      <td><%=(int)(time/1000)%> sec</td>
+      <td><%=StringUtils.formatTime(time)%></td>
       <td><%=eachQuery.getQueryInfo().getSql()%></td>
     </tr>
     <%
@@ -74,8 +74,8 @@
     <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Finished</th><th>Time</th><th>Status</th><th>sql</th></tr>
     <%
       for(QueryInProgress eachQuery: finishedQueries) {
-        long runTime = eachQuery.getQueryInfo().getFinishTime() == 0 ? -1 :
-                eachQuery.getQueryInfo().getFinishTime() - eachQuery.getQueryInfo().getStartTime();
+        long runTime = eachQuery.getQueryInfo().getFinishTime() >= 0 ?
+                eachQuery.getQueryInfo().getFinishTime() - eachQuery.getQueryInfo().getStartTime()
: -1;
         String detailView = "http://" + eachQuery.getQueryInfo().getQueryMasterHost() + ":"
+ workerHttpPort +
                 "/querydetail.jsp?queryId=" + eachQuery.getQueryId();
     %>
@@ -83,8 +83,8 @@
       <td><a href='<%=detailView%>'><%=eachQuery.getQueryId()%></a></td>
       <td><%=eachQuery.getQueryInfo().getQueryMasterHost()%></td>
       <td><%=df.format(eachQuery.getQueryInfo().getStartTime())%></td>
-      <td><%=df.format(eachQuery.getQueryInfo().getFinishTime())%></td>
-      <td><%=runTime%> ms</td>
+      <td><%=eachQuery.getQueryInfo().getFinishTime() >= 0 ? df.format(eachQuery.getQueryInfo().getFinishTime())
: "N/A"%></td>
+      <td><%=runTime == -1 ? "N/A" : StringUtils.formatTime(runTime) %></td>
       <td><%=eachQuery.getQueryInfo().getQueryState()%></td>
       <td><%=eachQuery.getQueryInfo().getSql()%></td>
     </tr>


Mime
View raw message