hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1167578 - in /hbase/trunk: ./ src/main/jamon/org/apache/hbase/tmpl/common/ src/main/jamon/org/apache/hbase/tmpl/master/ src/main/jamon/org/apache/hbase/tmpl/regionserver/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/...
Date Sat, 10 Sep 2011 18:44:46 GMT
Author: stack
Date: Sat Sep 10 18:44:45 2011
New Revision: 1167578

URL: http://svn.apache.org/viewvc?rev=1167578&view=rev
Log:
HBASE-4057 Implement HBase version of "show processlist"

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/common/TaskMonitorTmpl.jamon
    hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon
    hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RSStatusServlet.java
    hbase/trunk/src/main/resources/hbase-webapps/static/hbase.css
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Sat Sep 10 18:44:45 2011
@@ -540,6 +540,7 @@ Release 0.91.0 - Unreleased
                (Riley Patterson)
    HBASE-4292  Add a debugging dump servlet to the master and regionserver
                (todd)
+   HBASE-4057  Implement HBase version of "show processlist" (Riley Patterson)
 
 Release 0.90.5 - Unreleased
 

Modified: hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/common/TaskMonitorTmpl.jamon
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/common/TaskMonitorTmpl.jamon?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/common/TaskMonitorTmpl.jamon (original)
+++ hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/common/TaskMonitorTmpl.jamon Sat Sep
10 18:44:45 2011
@@ -20,40 +20,74 @@ limitations under the License.
 <%import>
 java.util.*;
 org.apache.hadoop.hbase.monitoring.*;
+org.apache.hadoop.util.StringUtils;
 </%import>
 <%args>
 TaskMonitor taskMonitor = TaskMonitor.get();
+String filter = "general";
+String format = "html";
 </%args>
 <%java>
+List<? extends MonitoredTask> tasks = taskMonitor.getTasks();
+Iterator<? extends MonitoredTask> iter = tasks.iterator();
+// apply requested filter
+while (iter.hasNext()) {
+  MonitoredTask t = iter.next();
+  if (filter.equals("general")) {
+    if (t instanceof MonitoredRPCHandler)
+      iter.remove();
+  } else if (filter.equals("handler")) {
+    if (!(t instanceof MonitoredRPCHandler))
+      iter.remove();
+  } else if (filter.equals("rpc")) {
+    if (!(t instanceof MonitoredRPCHandler) || 
+        !((MonitoredRPCHandler) t).isRPCRunning())
+      iter.remove();
+  } else if (filter.equals("operation")) {
+    if (!(t instanceof MonitoredRPCHandler) || 
+        !((MonitoredRPCHandler) t).isOperationRunning())
+      iter.remove();
+  }
+}
 long now = System.currentTimeMillis();
-List<MonitoredTask> tasks = taskMonitor.getTasks();
 Collections.reverse(tasks);
-
+boolean first = true;
 </%java>
-<h2>Currently running tasks</h2>
-
-<%if tasks.isEmpty()%>
-No tasks currently running on this node.
+<%if format.equals("json")%>
+[<%for MonitoredTask task : tasks%><%if first%><%java>first = false;</%java><%else>,</%if><%
task.toJSON() %></%for>]
 <%else>
+  <div style="float:right;">
+    <a href="?filter=all">Show All Monitored Tasks</a> |
+    <a href="?filter=general">Show non-RPC Tasks</a> |
+    <a href="?filter=handler">Show All RPC Handler Tasks</a> |
+    <a href="?filter=rpc">Show Active RPC Calls</a> |
+    <a href="?filter=operation">Show Client Operations</a> |
+    <a href="?format=json&filter=<% filter %>">View as JSON</a>
+  </div>
+  <h2>Recent tasks</h2>
+  <%if tasks.isEmpty()%>
+    No tasks currently running on this node.
+  <%else>
+    <table>
+    <tr>
+      <th>Start Time</th>
+      <th>Description</th>
+      <th>State</th>
+      <th>Status</th>
+    </tr>
+    <%for MonitoredTask task : tasks %>
+    <tr class="task-monitor-<% task.getState() %>">
+      <td><% new Date(task.getStartTime()) %></td>
+      <td><% task.getDescription() %></td>
+      <td><% task.getState() %>
+          (since <% StringUtils.formatTimeDiff(now, task.getStateTime()) %> ago)
+      </td>
+      <td><% task.getStatus() %>
+          (since <% StringUtils.formatTimeDiff(now, task.getStatusTime()) %> 
+          ago)</td>
+    </tr>
+    </%for>
+    </table>
 
-<table>
-<tr>
-  <th>Description</th>
-  <th>Status</th>
-  <th>Age</th>
-</tr>
-<%for MonitoredTask task : tasks %>
-<tr class="task-monitor-<% task.getState() %>">
-  <td><% task.getDescription() %></td>
-  </td>
-  <td><% task.getStatus() %></td>
-  <td><% (int)((now - task.getStartTime())/1000) %>s  
-  <%if task.getCompletionTimestamp() != -1%>
-  (Completed <% (now - task.getCompletionTimestamp())/1000 %>s ago)
   </%if>
-  </td>
-</tr>
-</%for>
-</table>
-
-</%if>
\ No newline at end of file
+</%if>

Modified: hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon (original)
+++ hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon Sat Sep
10 18:44:45 2011
@@ -25,6 +25,8 @@ ServerName rootLocation = null;
 ServerName metaLocation = null;
 List<ServerName> servers = null;
 boolean showAppendWarning = false;
+String filter = "general";
+String format = "html";
 </%args>
 <%import>
 java.util.*;
@@ -40,6 +42,10 @@ org.apache.hadoop.hbase.client.HBaseAdmi
 org.apache.hadoop.hbase.client.HConnectionManager;
 org.apache.hadoop.hbase.HTableDescriptor;
 </%import>
+<%if format.equals("json") %>
+  <& ../common/TaskMonitorTmpl; filter = filter; format = "json" &>
+  <%java return; %>
+</%if>
 <?xml version="1.0" encoding="UTF-8" ?>
 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" 
   "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 
@@ -94,7 +100,7 @@ org.apache.hadoop.hbase.HTableDescriptor
 <tr><td>Zookeeper Quorum</td><td><% master.getZooKeeperWatcher().getQuorum()
%></td><td>Addresses of all registered ZK servers. For more, see <a href="/zk.jsp">zk
dump</a>.</td></tr>
 </table>
 
-<& ../common/TaskMonitorTmpl &>
+<& ../common/TaskMonitorTmpl; filter = filter &>
 
 <%if (rootLocation != null) %>
 <& catalogTables &>

Modified: hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (original)
+++ hbase/trunk/src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon Sat Sep
10 18:44:45 2011
@@ -19,6 +19,8 @@ limitations under the License.
 </%doc>
 <%args>
 HRegionServer regionServer;
+String filter = "general";
+String format = "html";
 </%args>
 <%import>
 java.util.*;
@@ -33,6 +35,10 @@ org.apache.hadoop.hbase.HServerInfo;
 org.apache.hadoop.hbase.HServerLoad;
 org.apache.hadoop.hbase.HRegionInfo;
 </%import>
+<%if format.equals("json") %>
+  <& ../common/TaskMonitorTmpl; filter = filter; format = "json" &>
+  <%java return; %>
+</%if>
 <%java>
   HServerInfo serverInfo = null;
   try {
@@ -73,7 +79,7 @@ org.apache.hadoop.hbase.HRegionInfo;
 <tr><td>Zookeeper Quorum</td><td><% regionServer.getZooKeeper().getQuorum()
%></td><td>Addresses of all registered ZK servers</td></tr>
 </table>
 
-<& ../common/TaskMonitorTmpl &>
+<& ../common/TaskMonitorTmpl; filter = filter &>
 
 <h2>Online Regions</h2>
 <%if (onlineRegions != null && onlineRegions.size() > 0) %>

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Sat Sep 10 18:44:45
2011
@@ -59,6 +59,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
 import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
@@ -1029,6 +1031,10 @@ public abstract class HBaseServer implem
       return hostAddress;
     }
 
+    public int getRemotePort() {
+      return remotePort;
+    }
+
     public void setLastContact(long lastContact) {
       this.lastContact = lastContact;
     }
@@ -1176,6 +1182,7 @@ public abstract class HBaseServer implem
   /** Handles queued calls . */
   private class Handler extends Thread {
     private final BlockingQueue<Call> myCallQueue;
+    private MonitoredRPCHandler status;
 
     public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
       this.myCallQueue = cq;
@@ -1187,15 +1194,21 @@ public abstract class HBaseServer implem
         threadName = "PRI " + threadName;
       }
       this.setName(threadName);
+      this.status = TaskMonitor.get().createRPCStatus(threadName);
     }
 
     @Override
     public void run() {
       LOG.info(getName() + ": starting");
+      status.setStatus("starting");
       SERVER.set(HBaseServer.this);
       while (running) {
         try {
+          status.pause("Waiting for a call");
           Call call = myCallQueue.take(); // pop the queue; maybe blocked here
+          status.setStatus("Setting up call");
+          status.setConnection(call.connection.getHostAddress(), 
+              call.connection.getRemotePort());
 
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + ": has #" + call.id + " from " +
@@ -1209,7 +1222,9 @@ public abstract class HBaseServer implem
           try {
             if (!started)
               throw new ServerNotRunningYetException("Server is not running yet");
-            value = call(call.connection.protocol, call.param, call.timestamp);         
   // make the call
+            // make the call
+            value = call(call.connection.protocol, call.param, call.timestamp, 
+                status);
           } catch (Throwable e) {
             LOG.debug(getName()+", call "+call+": error: " + e, e);
             errorClass = e.getClass().getName();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Sat Sep 10 18:44:45
2011
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.ipc;
 import com.google.common.base.Function;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -48,7 +49,7 @@ public interface RpcServer {
    * @throws java.io.IOException e
    */
   Writable call(Class<? extends VersionedProtocol> protocol,
-      Writable param, long receiveTime)
+      Writable param, long receiveTime, MonitoredRPCHandler status)
       throws IOException;
 
   int getNumOpenConnections();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Sat Sep 10
18:44:45 2011
@@ -38,6 +38,7 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Objects;
@@ -316,7 +317,7 @@ class WritableRpcEngine implements RpcEn
 
     @Override
     public Writable call(Class<? extends VersionedProtocol> protocol,
-        Writable param, long receivedTime)
+        Writable param, long receivedTime, MonitoredRPCHandler status)
     throws IOException {
       try {
         Invocation call = (Invocation)param;
@@ -325,6 +326,9 @@ class WritableRpcEngine implements RpcEn
               "cause is a version mismatch between client and server.");
         }
         if (verbose) log("Call: " + call);
+        status.setRPC(call.getMethodName(), call.getParameters(), receivedTime);
+        status.setRPCPacket(param);
+        status.resume("Servicing call");
 
         Method method =
           protocol.getMethod(call.getMethodName(),
@@ -369,7 +373,8 @@ class WritableRpcEngine implements RpcEn
           // when tagging, we let TooLarge trump TooSmall to keep output simple
           // note that large responses will often also be slow.
           logResponse(call, (tooLarge ? "TooLarge" : "TooSlow"),
-              startTime, processingTime, qTime, responseSize);
+              status.getClient(), startTime, processingTime, qTime,
+              responseSize);
           // provides a count of log-reported slow responses
           if (tooSlow) {
             rpcMetrics.rpcSlowResponseTime.inc(processingTime);
@@ -407,13 +412,14 @@ class WritableRpcEngine implements RpcEn
      * client Operations.
      * @param call The call to log.
      * @param tag  The tag that will be used to indicate this event in the log.
+     * @param client          The address of the client who made this call.
      * @param startTime       The time that the call was initiated, in ms.
      * @param processingTime  The duration that the call took to run, in ms.
      * @param qTime           The duration that the call spent on the queue 
      *                        prior to being initiated, in ms.
      * @param responseSize    The size in bytes of the response buffer.
      */
-    private void logResponse(Invocation call, String tag,
+    private void logResponse(Invocation call, String tag, String clientAddress,
         long startTime, int processingTime, int qTime, long responseSize)
       throws IOException {
       Object params[] = call.getParameters();
@@ -425,6 +431,7 @@ class WritableRpcEngine implements RpcEn
       responseInfo.put("processingtimems", processingTime);
       responseInfo.put("queuetimems", qTime);
       responseInfo.put("responsesize", responseSize);
+      responseInfo.put("client", clientAddress);
       responseInfo.put("class", instance.getClass().getSimpleName());
       responseInfo.put("method", call.getMethodName());
       if (params.length == 2 && instance instanceof HRegionServer &&

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java Sat
Sep 10 18:44:45 2011
@@ -61,13 +61,17 @@ public class MasterStatusServlet extends
     List<ServerName> servers = master.getServerManager().getOnlineServersList();
 
     response.setContentType("text/html");
-    new MasterStatusTmpl()
+    MasterStatusTmpl tmpl = new MasterStatusTmpl()
       .setFrags(frags)
       .setShowAppendWarning(shouldShowAppendWarning(conf))
       .setRootLocation(rootLocation)
       .setMetaLocation(metaLocation)
-      .setServers(servers)
-      .render(response.getWriter(),
+      .setServers(servers);
+    if (request.getParameter("filter") != null)
+      tmpl.setFilter(request.getParameter("filter"));
+    if (request.getParameter("format") != null)
+      tmpl.setFormat(request.getParameter("format"));
+    tmpl.render(response.getWriter(),
           master, admin);
   }
 

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java?rev=1167578&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
(added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
Sat Sep 10 18:44:45 2011
@@ -0,0 +1,44 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A MonitoredTask implementation optimized for use with RPC Handlers 
+ * handling frequent, short duration tasks. String concatenations and object 
+ * allocations are avoided in methods that will be hit by every RPC call.
+ */
+public interface MonitoredRPCHandler extends MonitoredTask {
+  public abstract String getRPC();
+  public abstract String getRPC(boolean withParams);
+  public abstract long getRPCPacketLength();
+  public abstract String getClient();
+  public abstract long getRPCStartTime();
+  public abstract long getRPCQueueTime();
+  public abstract boolean isRPCRunning();
+  public abstract boolean isOperationRunning();
+  
+  public abstract void setRPC(String methodName, Object [] params,
+      long queueTime);
+  public abstract void setRPCPacket(Writable param);
+  public abstract void setConnection(String clientAddress, int remotePort);
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java?rev=1167578&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
(added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
Sat Sep 10 18:44:45 2011
@@ -0,0 +1,256 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import org.apache.hadoop.hbase.client.Operation;
+import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A MonitoredTask implementation designed for use with RPC Handlers 
+ * handling frequent, short duration tasks. String concatenations and object 
+ * allocations are avoided in methods that will be hit by every RPC call.
+ */
+public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl 
+  implements MonitoredRPCHandler {
+  private String clientAddress;
+  private int remotePort;
+  private long rpcQueueTime;
+  private long rpcStartTime;
+  private String methodName = "";
+  private Object [] params = {};
+  private Writable packet;
+
+  public MonitoredRPCHandlerImpl() {
+    super();
+    // in this implementation, WAITING indicates that the handler is not 
+    // actively servicing an RPC call.
+    setState(State.WAITING);
+  }
+
+  @Override
+  public synchronized MonitoredRPCHandlerImpl clone() {
+    return (MonitoredRPCHandlerImpl) super.clone();
+  }
+
+  /**
+   * Gets the status of this handler; if it is currently servicing an RPC, 
+   * this status will include the RPC information.
+   * @return a String describing the current status.
+   */
+  @Override
+  public String getStatus() {
+    if (getState() != State.RUNNING) {
+      return super.getStatus();
+    }
+    return super.getStatus() + " from " + getClient() + ": " + getRPC();
+  }
+
+  /**
+   * Accesses the queue time for the currently running RPC on the 
+   * monitored Handler.
+   * @return the queue timestamp or -1 if there is no RPC currently running.
+   */
+  public long getRPCQueueTime() {
+    if (getState() != State.RUNNING) {
+      return -1;
+    }
+    return rpcQueueTime;
+  }
+
+  /**
+   * Accesses the start time for the currently running RPC on the 
+   * monitored Handler.
+   * @return the start timestamp or -1 if there is no RPC currently running.
+   */
+  public long getRPCStartTime() {
+    if (getState() != State.RUNNING) {
+      return -1;
+    }
+    return rpcStartTime;
+  }
+
+  /**
+   * Produces a string representation of the method currently being serviced
+   * by this Handler.
+   * @return a string representing the method call without parameters
+   */
+  public String getRPC() {
+    return getRPC(false);
+  }
+
+  /**
+   * Produces a string representation of the method currently being serviced
+   * by this Handler.
+   * @param withParams toggle inclusion of parameters in the RPC String
+   * @return A human-readable string representation of the method call.
+   */
+  public synchronized String getRPC(boolean withParams) {
+    if (getState() != State.RUNNING) {
+      // no RPC is currently running
+      return "";
+    }
+    StringBuilder buffer = new StringBuilder(256);
+    buffer.append(methodName);
+    if (withParams) {
+      buffer.append("(");
+      for (int i = 0; i < params.length; i++) {
+        if (i != 0)
+          buffer.append(", ");
+        buffer.append(params[i]);
+      }
+      buffer.append(")");
+    }
+    return buffer.toString();
+  }
+
+  /**
+   * Produces a string representation of the method currently being serviced
+   * by this Handler.
+   * @return A human-readable string representation of the method call.
+   */
+  public long getRPCPacketLength() {
+    if (getState() != State.RUNNING || packet == null) {
+      // no RPC is currently running, or we don't have an RPC's packet info
+      return -1L;
+    }
+    if (!(packet instanceof WritableWithSize)) {
+      // the packet passed to us doesn't expose size information
+      return -1L;
+    }
+    return ((WritableWithSize) packet).getWritableSize();
+  }
+
+  /**
+   * If an RPC call is currently running, produces a String representation of 
+   * the connection from which it was received.
+   * @return A human-readable string representation of the address and port 
+   *  of the client.
+   */
+  public String getClient() {
+    return clientAddress + ":" + remotePort;
+  }
+
+  /**
+   * Indicates to the client whether this task is monitoring a currently active 
+   * RPC call.
+   * @return true if the monitored handler is currently servicing an RPC call.
+   */
+  public boolean isRPCRunning() {
+    return getState() == State.RUNNING;
+  }
+
+  /**
+   * Indicates to the client whether this task is monitoring a currently active 
+   * RPC call to a database command. (as defined by 
+   * o.a.h.h.client.Operation)
+   * @return true if the monitored handler is currently servicing an RPC call
+   * to a database command.
+   */
+  public boolean isOperationRunning() {
+    if(!isRPCRunning()) {
+      return false;
+    }
+    for(Object param : params) {
+      if (param instanceof Operation) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Tells this instance that it is monitoring a new RPC call.
+   * @param methodName The name of the method that will be called by the RPC.
+   * @param params The parameters that will be passed to the indicated method.
+   */
+  public synchronized void setRPC(String methodName, Object [] params, 
+      long queueTime) {
+    this.methodName = methodName;
+    this.params = params;
+    this.rpcStartTime = System.currentTimeMillis();
+    this.rpcQueueTime = queueTime;
+    this.state = State.RUNNING;
+  }
+
+  /**
+   * Gives this instance a reference to the Writable received by the RPC, so 
+   * that it can later compute its size if asked for it.
+   * @param param The Writable received by the RPC for this call
+   */
+  public void setRPCPacket(Writable param) {
+    this.packet = param;
+  }
+
+  /**
+   * Registers current handler client details.
+   * @param clientAddress the address of the current client
+   * @param remotePort the port from which the client connected
+   */
+  public void setConnection(String clientAddress, int remotePort) {
+    this.clientAddress = clientAddress;
+    this.remotePort = remotePort;
+  }
+
+  public synchronized Map<String, Object> toMap() {
+    // only include RPC info if the Handler is actively servicing an RPC call
+    Map<String, Object> map = super.toMap();
+    if (getState() != State.RUNNING) {
+      return map;
+    }
+    Map<String, Object> rpcJSON = new HashMap<String, Object>();
+    ArrayList paramList = new ArrayList();
+    map.put("rpcCall", rpcJSON);
+    rpcJSON.put("queuetimems", getRPCQueueTime());
+    rpcJSON.put("starttimems", getRPCStartTime());
+    rpcJSON.put("clientaddress", clientAddress);
+    rpcJSON.put("remoteport", remotePort);
+    rpcJSON.put("packetlength", getRPCPacketLength());
+    rpcJSON.put("method", methodName);
+    rpcJSON.put("params", paramList);
+    for(Object param : params) {
+      if(param instanceof byte []) {
+        paramList.add(Bytes.toStringBinary((byte []) param));
+      } else if (param instanceof Operation) {
+        paramList.add(((Operation) param).toMap());
+      } else {
+        paramList.add(param.toString());
+      }
+    }
+    return map;
+  }
+
+  @Override
+  public String toString() {
+    if (getState() != State.RUNNING) {
+      return super.toString();
+    }
+    return super.toString() + ", rpcMethod=" + getRPC();
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java Sat Sep
10 18:44:45 2011
@@ -19,28 +19,32 @@
  */
 package org.apache.hadoop.hbase.monitoring;
 
-public interface MonitoredTask {
+import java.io.IOException;
+import java.util.Map;
+
+public interface MonitoredTask extends Cloneable {
   enum State {
     RUNNING,
+    WAITING,
     COMPLETE,
     ABORTED;
   }
 
   public abstract long getStartTime();
-
   public abstract String getDescription();
-
   public abstract String getStatus();
-
+  public abstract long getStatusTime();
   public abstract State getState();
-
+  public abstract long getStateTime();
   public abstract long getCompletionTimestamp();
 
   public abstract void markComplete(String msg);
+  public abstract void pause(String msg);
+  public abstract void resume(String msg);
   public abstract void abort(String msg);
+  public abstract void expireNow();
 
   public abstract void setStatus(String status);
-
   public abstract void setDescription(String description);
 
   /**
@@ -49,5 +53,24 @@ public interface MonitoredTask {
    */
   public abstract void cleanup();
 
+  /**
+   * Public exposure of Object.clone() in order to allow clients to easily 
+   * capture current state.
+   * @returns a copy of the object whose references will not change
+   */
+  public abstract MonitoredTask clone();
+
+  /**
+   * Creates a string map of internal details for extensible exposure of 
+   * monitored tasks.
+   * @return A Map containing information for this task.
+   */
+  public abstract Map<String, Object> toMap() throws IOException;
+
+  /**
+   * Creates a JSON object for parseable exposure of monitored tasks.
+   * @return An encoded JSON object containing information for this task.
+   */
+  public abstract String toJSON() throws IOException;
 
-}
\ No newline at end of file
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java Sat
Sep 10 18:44:45 2011
@@ -19,19 +19,35 @@
  */
 package org.apache.hadoop.hbase.monitoring;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 class MonitoredTaskImpl implements MonitoredTask {
   private long startTime;
-  private long completionTimestamp = -1;
+  private long statusTime;
+  private long stateTime;
   
-  private String status;
-  private String description;
+  private volatile String status;
+  private volatile String description;
   
-  private State state = State.RUNNING;
+  protected volatile State state = State.RUNNING;
   
   public MonitoredTaskImpl() {
     startTime = System.currentTimeMillis();
+    statusTime = startTime;
+    stateTime = startTime;
+  }
+
+  @Override
+  public synchronized MonitoredTaskImpl clone() {
+    try {
+      return (MonitoredTaskImpl) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new AssertionError(); // Won't happen
+    }
   }
 
   @Override
@@ -48,6 +64,11 @@ class MonitoredTaskImpl implements Monit
   public String getStatus() {
     return status;
   }
+
+  @Override
+  public long getStatusTime() {
+    return statusTime;
+  }
   
   @Override
   public State getState() {
@@ -55,27 +76,51 @@ class MonitoredTaskImpl implements Monit
   }
   
   @Override
-  public long getCompletionTimestamp() {
-    return completionTimestamp;
+  public long getStateTime() {
+    return stateTime;
   }
   
   @Override
+  public long getCompletionTimestamp() {
+    if (state == State.COMPLETE || state == State.ABORTED) {
+      return stateTime;
+    }
+    return -1;
+  }
+
+  @Override
   public void markComplete(String status) {
-    state = State.COMPLETE;
+    setState(State.COMPLETE);
     setStatus(status);
-    completionTimestamp = System.currentTimeMillis();
+  }
+
+  @Override
+  public void pause(String msg) {
+    setState(State.WAITING);
+    setStatus(msg);
+  }
+
+  @Override
+  public void resume(String msg) {
+    setState(State.RUNNING);
+    setStatus(msg);
   }
 
   @Override
   public void abort(String msg) {
     setStatus(msg);
-    state = State.ABORTED;
-    completionTimestamp = System.currentTimeMillis();
+    setState(State.ABORTED);
   }
   
   @Override
   public void setStatus(String status) {
     this.status = status;
+    statusTime = System.currentTimeMillis();
+  }
+
+  protected void setState(State state) {
+    this.state = state;
+    stateTime = System.currentTimeMillis();
   }
 
   @Override
@@ -86,8 +131,7 @@ class MonitoredTaskImpl implements Monit
   @Override
   public void cleanup() {
     if (state == State.RUNNING) {
-      state = State.ABORTED;
-      completionTimestamp = System.currentTimeMillis();
+      setState(State.ABORTED);
     }
   }
 
@@ -95,8 +139,41 @@ class MonitoredTaskImpl implements Monit
    * Force the completion timestamp backwards so that
    * it expires now.
    */
-  @VisibleForTesting
-  void expireNow() {
-    completionTimestamp -= 180 * 1000;
+  public void expireNow() {
+    stateTime -= 180 * 1000;
   }
+
+  @Override
+  public Map<String, Object> toMap() {
+    Map<String, Object> map = new HashMap<String, Object>();
+    map.put("description", getDescription());
+    map.put("status", getStatus());
+    map.put("state", getState());
+    map.put("starttimems", getStartTime());
+    map.put("statustimems", getCompletionTimestamp());
+    map.put("statetimems", getCompletionTimestamp());
+    return map;
+  }
+
+  @Override
+  public String toJSON() throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    return mapper.writeValueAsString(toMap());
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(512);
+    sb.append(getDescription());
+    sb.append(": status=");
+    sb.append(getStatus());
+    sb.append(", state=");
+    sb.append(getState());
+    sb.append(", startTime=");
+    sb.append(getStartTime());
+    sb.append(", completionTime=");
+    sb.append(getCompletionTimestamp());
+    return sb.toString();
+  }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java Sat Sep
10 18:44:45 2011
@@ -71,12 +71,23 @@ public class TaskMonitor {
         stat.getClass().getClassLoader(),
         new Class<?>[] { MonitoredTask.class },
         new PassthroughInvocationHandler<MonitoredTask>(stat));
+    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
+    tasks.add(pair);
+    return proxy;
+  }
 
+  public MonitoredRPCHandler createRPCStatus(String description) {
+    MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
+    stat.setDescription(description);
+    MonitoredRPCHandler proxy = (MonitoredRPCHandler) Proxy.newProxyInstance(
+        stat.getClass().getClassLoader(),
+        new Class<?>[] { MonitoredRPCHandler.class },
+        new PassthroughInvocationHandler<MonitoredRPCHandler>(stat));
     TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
     tasks.add(pair);
     return proxy;
   }
-  
+
   private synchronized void purgeExpiredTasks() {
     int size = 0;
     
@@ -107,11 +118,17 @@ public class TaskMonitor {
     }
   }
 
+  /**
+   * Produces a list containing copies of the current state of all non-expired 
+   * MonitoredTasks handled by this TaskMonitor.
+   * @return A complete list of MonitoredTasks.
+   */
   public synchronized List<MonitoredTask> getTasks() {
     purgeExpiredTasks();
     ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
     for (TaskAndWeakRefPair pair : tasks) {
-      ret.add(pair.get());
+      MonitoredTask t = pair.get();
+      ret.add(t.clone());
     }
     return ret;
   }
@@ -181,7 +198,8 @@ public class TaskMonitor {
   }
   
   /**
-   * An InvocationHandler that simply passes through calls to the original object.
+   * An InvocationHandler that simply passes through calls to the original 
+   * object.
    */
   private static class PassthroughInvocationHandler<T> implements InvocationHandler
{
     private T delegatee;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RSStatusServlet.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RSStatusServlet.java?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RSStatusServlet.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RSStatusServlet.java Sat
Sep 10 18:44:45 2011
@@ -40,7 +40,12 @@ public class RSStatusServlet extends Htt
     assert hrs != null : "No RS in context!";
     
     resp.setContentType("text/html");
-    new RSStatusTmpl().render(resp.getWriter(), hrs);
+    RSStatusTmpl tmpl = new RSStatusTmpl();
+    if (req.getParameter("format") != null)
+      tmpl.setFormat(req.getParameter("format"));
+    if (req.getParameter("filter") != null)
+      tmpl.setFilter(req.getParameter("filter"));
+    tmpl.render(resp.getWriter(), hrs);
   }
 
 }

Modified: hbase/trunk/src/main/resources/hbase-webapps/static/hbase.css
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/resources/hbase-webapps/static/hbase.css?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/main/resources/hbase-webapps/static/hbase.css (original)
+++ hbase/trunk/src/main/resources/hbase-webapps/static/hbase.css Sat Sep 10 18:44:45 2011
@@ -25,3 +25,9 @@ tr.task-monitor-COMPLETE td {
 tr.task-monitor-ABORTED td {
   background-color: #ffa;
 }
+
+tr.task-monitor-WAITING td {
+  background-color: #ccc;
+  font-style: italic;
+}
++ 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java?rev=1167578&r1=1167577&r2=1167578&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java Sat
Sep 10 18:44:45 2011
@@ -44,13 +44,13 @@ public class TestTaskMonitor {
     
     // Mark it as finished
     task.markComplete("Finished!");
-    assertEquals(MonitoredTask.State.COMPLETE, taskFromTm.getState());
+    assertEquals(MonitoredTask.State.COMPLETE, task.getState());
     
     // It should still show up in the TaskMonitor list
     assertEquals(1, tm.getTasks().size());
     
     // If we mark its completion time back a few minutes, it should get gced
-    ((MonitoredTaskImpl)taskFromTm).expireNow();
+    task.expireNow();
     assertEquals(0, tm.getTasks().size());
   }
   



Mime
View raw message