lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r1580466 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/handler/admin/ solr/core/src/test/org/apache/solr/cloud/ solr/solrj/ solr/solrj/src/java/org/apac...
Date Sun, 23 Mar 2014 07:32:20 GMT
Author: shalin
Date: Sun Mar 23 07:32:20 2014
New Revision: 1580466

URL: http://svn.apache.org/r1580466
Log:
SOLR-5749: A new Overseer status collection API exposes overseer queue sizes, timing statistics,
success and error counts and last N failures per operation

Added:
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java
      - copied unchanged from r1580463, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java
Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
    lucene/dev/branches/branch_4x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java

Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1580466&r1=1580465&r2=1580466&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Sun Mar 23 07:32:20 2014
@@ -90,6 +90,9 @@ New Features
   improve logging and force refresh cluster state every 15 seconds.
   (Timothy Potter via shalin)
 
+* SOLR-5749: A new Overseer status collection API exposes overseer queue sizes, timing
+  statistics, success and error counts and last N failures per operation. (shalin)
+
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1580466&r1=1580465&r2=1580466&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
(original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
Sun Mar 23 07:32:20 2014
@@ -26,6 +26,7 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.util.stats.TimerContext;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -52,10 +53,16 @@ public class DistributedQueue {
   private final String prefix = "qn-";
   
   private final String response_prefix = "qnr-" ;
+
+  private final Overseer.Stats stats;
   
   public DistributedQueue(SolrZkClient zookeeper, String dir, List<ACL> acl) {
+    this(zookeeper, dir, acl, new Overseer.Stats());
+  }
+
+  public DistributedQueue(SolrZkClient zookeeper, String dir, List<ACL> acl, Overseer.Stats
stats) {
     this.dir = dir;
-    
+
     ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
     try {
       cmdExecutor.ensureExists(dir, zookeeper);
@@ -65,12 +72,12 @@ public class DistributedQueue {
       Thread.currentThread().interrupt();
       throw new SolrException(ErrorCode.SERVER_ERROR, e);
     }
-    
+
     if (acl != null) {
       this.acl = acl;
     }
     this.zookeeper = zookeeper;
-    
+    this.stats = stats;
   }
   
   /**
@@ -155,25 +162,30 @@ public class DistributedQueue {
       InterruptedException {
     TreeMap<Long,String> orderedChildren;
     // Same as for element. Should refactor this.
-    while (true) {
-      try {
-        orderedChildren = orderedChildren(null);
-      } catch (KeeperException.NoNodeException e) {
-        throw new NoSuchElementException();
-      }
-      if (orderedChildren.size() == 0) throw new NoSuchElementException();
-      
-      for (String headNode : orderedChildren.values()) {
-        String path = dir + "/" + headNode;
+    TimerContext time = stats.time(dir + "_remove");
+    try {
+      while (true) {
         try {
-          byte[] data = zookeeper.getData(path, null, null, true);
-          zookeeper.delete(path, -1, true);
-          return data;
+          orderedChildren = orderedChildren(null);
         } catch (KeeperException.NoNodeException e) {
-          // Another client deleted the node first.
+          throw new NoSuchElementException();
+        }
+        if (orderedChildren.size() == 0) throw new NoSuchElementException();
+
+        for (String headNode : orderedChildren.values()) {
+          String path = dir + "/" + headNode;
+          try {
+            byte[] data = zookeeper.getData(path, null, null, true);
+            zookeeper.delete(path, -1, true);
+            return data;
+          } catch (KeeperException.NoNodeException e) {
+            // Another client deleted the node first.
+          }
         }
+
       }
-      
+    } finally {
+      time.stop();
     }
   }
   
@@ -183,15 +195,20 @@ public class DistributedQueue {
    */
   public byte[] remove(QueueEvent event) throws KeeperException,
       InterruptedException {
-    String path = event.getId();
-    String responsePath = dir + "/" + response_prefix
-        + path.substring(path.lastIndexOf("-") + 1);
-    if (zookeeper.exists(responsePath, true)) {
-      zookeeper.setData(responsePath, event.getBytes(), true);
-    }
-    byte[] data = zookeeper.getData(path, null, null, true);
-    zookeeper.delete(path, -1, true);
-    return data;
+    TimerContext time = stats.time(dir + "_remove_event");
+    try {
+      String path = event.getId();
+      String responsePath = dir + "/" + response_prefix
+          + path.substring(path.lastIndexOf("-") + 1);
+      if (zookeeper.exists(responsePath, true)) {
+        zookeeper.setData(responsePath, event.getBytes(), true);
+      }
+      byte[] data = zookeeper.getData(path, null, null, true);
+      zookeeper.delete(path, -1, true);
+      return data;
+    } finally {
+      time.stop();
+    }
   }
   
   
@@ -235,29 +252,34 @@ public class DistributedQueue {
   public byte[] take() throws KeeperException, InterruptedException {
     TreeMap<Long,String> orderedChildren;
     // Same as for element. Should refactor this.
-    while (true) {
-      LatchChildWatcher childWatcher = new LatchChildWatcher();
-      try {
-        orderedChildren = orderedChildren(childWatcher);
-      } catch (KeeperException.NoNodeException e) {
-        zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
-        continue;
-      }
-      if (orderedChildren.size() == 0) {
-        childWatcher.await(DEFAULT_TIMEOUT);
-        continue;
-      }
-      
-      for (String headNode : orderedChildren.values()) {
-        String path = dir + "/" + headNode;
+    TimerContext timer = stats.time(dir + "_take");
+    try {
+      while (true) {
+        LatchChildWatcher childWatcher = new LatchChildWatcher();
         try {
-          byte[] data = zookeeper.getData(path, null, null, true);
-          zookeeper.delete(path, -1, true);
-          return data;
+          orderedChildren = orderedChildren(childWatcher);
         } catch (KeeperException.NoNodeException e) {
-          // Another client deleted the node first.
+          zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
+          continue;
+        }
+        if (orderedChildren.size() == 0) {
+          childWatcher.await(DEFAULT_TIMEOUT);
+          continue;
+        }
+
+        for (String headNode : orderedChildren.values()) {
+          String path = dir + "/" + headNode;
+          try {
+            byte[] data = zookeeper.getData(path, null, null, true);
+            zookeeper.delete(path, -1, true);
+            return data;
+          } catch (KeeperException.NoNodeException e) {
+            // Another client deleted the node first.
+          }
         }
       }
+    } finally {
+      timer.stop();
     }
   }
   
@@ -268,8 +290,13 @@ public class DistributedQueue {
    */
   public boolean offer(byte[] data) throws KeeperException,
       InterruptedException {
-    return createData(dir + "/" + prefix, data,
-        CreateMode.PERSISTENT_SEQUENTIAL) != null;
+    TimerContext time = stats.time(dir + "_offer");
+    try {
+      return createData(dir + "/" + prefix, data,
+          CreateMode.PERSISTENT_SEQUENTIAL) != null;
+    } finally {
+      time.stop();
+    }
   }
   
   /**
@@ -298,21 +325,26 @@ public class DistributedQueue {
    */
   public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
       InterruptedException {
-    String path = createData(dir + "/" + prefix, data,
-        CreateMode.PERSISTENT_SEQUENTIAL);
-    String watchID = createData(
-        dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
-        null, CreateMode.EPHEMERAL);
-    Object lock = new Object();
-    LatchChildWatcher watcher = new LatchChildWatcher(lock);
-    synchronized (lock) {
-      if (zookeeper.exists(watchID, watcher, true) != null) {
-        watcher.await(timeout);
+    TimerContext time = stats.time(dir + "_offer");
+    try {
+      String path = createData(dir + "/" + prefix, data,
+          CreateMode.PERSISTENT_SEQUENTIAL);
+      String watchID = createData(
+          dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
+          null, CreateMode.EPHEMERAL);
+      Object lock = new Object();
+      LatchChildWatcher watcher = new LatchChildWatcher(lock);
+      synchronized (lock) {
+        if (zookeeper.exists(watchID, watcher, true) != null) {
+          watcher.await(timeout);
+        }
       }
+      byte[] bytes = zookeeper.getData(watchID, null, null, true);
+      zookeeper.delete(watchID, -1, true);
+      return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
+    } finally {
+      time.stop();
     }
-    byte[] bytes = zookeeper.getData(watchID, null, null, true);
-    zookeeper.delete(watchID, -1, true);
-    return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
   }
   
   /**
@@ -322,9 +354,14 @@ public class DistributedQueue {
    * @return data at the first element of the queue, or null.
    */
   public byte[] peek() throws KeeperException, InterruptedException {
+    TimerContext time = stats.time(dir + "_peek");
+    try {
       QueueEvent element = element();
-      if(element == null) return null;
+      if (element == null) return null;
       return element.getBytes();
+    } finally {
+      time.stop();
+    }
   }
   
   public static class QueueEvent {
@@ -399,38 +436,48 @@ public class DistributedQueue {
    * @return data at the first element of the queue, or null.
    */
   public QueueEvent peek(long wait) throws KeeperException, InterruptedException {
-    if (wait == 0) {
-      return element();
+    TimerContext time = null;
+    if (wait == Long.MAX_VALUE) {
+      time = stats.time(dir + "_peek_wait_forever");
+    } else {
+      time = stats.time(dir + "_peek_wait" + wait);
     }
-
-    TreeMap<Long,String> orderedChildren;
-    boolean waitedEnough = false;
-    while (true) {
-      LatchChildWatcher childWatcher = new LatchChildWatcher();
-      try {
-        orderedChildren = orderedChildren(childWatcher);
-      } catch (KeeperException.NoNodeException e) {
-        zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
-        continue;
-      }
-      if(waitedEnough) {
-        if(orderedChildren.isEmpty()) return null;
-      }
-      if (orderedChildren.size() == 0) {
-        childWatcher.await(wait == Long.MAX_VALUE ?  DEFAULT_TIMEOUT: wait);
-        waitedEnough = wait != Long.MAX_VALUE;
-        continue;
+    try {
+      if (wait == 0) {
+        return element();
       }
 
-      for (String headNode : orderedChildren.values()) {
-        String path = dir + "/" + headNode;
+      TreeMap<Long, String> orderedChildren;
+      boolean waitedEnough = false;
+      while (true) {
+        LatchChildWatcher childWatcher = new LatchChildWatcher();
         try {
-          byte[] data = zookeeper.getData(path, null, null, true);
-          return new QueueEvent(path, data, childWatcher.getWatchedEvent());
+          orderedChildren = orderedChildren(childWatcher);
         } catch (KeeperException.NoNodeException e) {
-          // Another client deleted the node first.
+          zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
+          continue;
+        }
+        if (waitedEnough) {
+          if (orderedChildren.isEmpty()) return null;
+        }
+        if (orderedChildren.size() == 0) {
+          childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
+          waitedEnough = wait != Long.MAX_VALUE;
+          continue;
+        }
+
+        for (String headNode : orderedChildren.values()) {
+          String path = dir + "/" + headNode;
+          try {
+            byte[] data = zookeeper.getData(path, null, null, true);
+            return new QueueEvent(path, data, childWatcher.getWatchedEvent());
+          } catch (KeeperException.NoNodeException e) {
+            // Another client deleted the node first.
+          }
         }
       }
+    } finally {
+      time.stop();
     }
   }
   
@@ -441,11 +488,17 @@ public class DistributedQueue {
    * @return Head of the queue or null.
    */
   public byte[] poll() throws KeeperException, InterruptedException {
+    TimerContext time = stats.time(dir + "_poll");
     try {
       return remove();
     } catch (NoSuchElementException e) {
       return null;
+    } finally {
+      time.stop();
     }
   }
-  
+
+  public Overseer.Stats getStats() {
+    return stats;
+  }
 }

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1580466&r1=1580465&r2=1580466&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Sun
Mar 23 07:32:20 2014
@@ -27,12 +27,16 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClosableThread;
 import org.apache.solr.common.cloud.ClusterState;
@@ -47,6 +51,9 @@ import org.apache.solr.common.cloud.ZkCo
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.util.stats.Clock;
+import org.apache.solr.util.stats.Timer;
+import org.apache.solr.util.stats.TimerContext;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -62,8 +69,11 @@ public class Overseer {
   public static final String REMOVESHARD = "removeshard";
   public static final String ADD_ROUTING_RULE = "addroutingrule";
   public static final String REMOVE_ROUTING_RULE = "removeroutingrule";
+  public static final String STATE = "state";
 
   public static final int STATE_UPDATE_DELAY = 1500;  // delay between cloud state updates
+  public static final String CREATESHARD = "createshard";
+  public static final String UPDATESHARDSTATE = "updateshardstate";
 
   private static Logger log = LoggerFactory.getLogger(Overseer.class);
 
@@ -88,13 +98,16 @@ public class Overseer {
     // Internal map which holds the information about failed tasks.
     private final DistributedMap failureMap;
 
+    private final Stats zkStats;
+
     private Map clusterProps;
     private boolean isClosed = false;
 
-    public ClusterStateUpdater(final ZkStateReader reader, final String myId) {
+    public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats)
{
       this.zkClient = reader.getZkClient();
-      this.stateUpdateQueue = getInQueue(zkClient);
-      this.workQueue = getInternalQueue(zkClient);
+      this.zkStats = zkStats;
+      this.stateUpdateQueue = getInQueue(zkClient, zkStats);
+      this.workQueue = getInternalQueue(zkClient, zkStats);
       this.failureMap = getFailureMap(zkClient);
       this.runningMap = getRunningMap(zkClient);
       this.completedMap = getCompletedMap(zkClient);
@@ -102,6 +115,14 @@ public class Overseer {
       this.reader = reader;
       clusterProps = reader.getClusterProps();
     }
+
+    public Stats getStateUpdateQueueStats() {
+      return stateUpdateQueue.getStats();
+    }
+
+    public Stats getWorkQueueStats()  {
+      return workQueue.getStats();
+    }
     
     @Override
     public void run() {
@@ -133,8 +154,10 @@ public class Overseer {
                 else if (LeaderStatus.YES == isLeader) {
                   final ZkNodeProps message = ZkNodeProps.load(head);
                   final String operation = message.getStr(QUEUE_OPERATION);
+                  final TimerContext timerContext = stats.time(operation);
                   try {
                     clusterState = processMessage(clusterState, message, operation);
+                    stats.success(operation);
                   } catch (Exception e) {
                     // generally there is nothing we can do - in most cases, we have
                     // an issue that will fail again on retry or we cannot communicate with
@@ -142,6 +165,9 @@ public class Overseer {
                     // TODO: if ordering for the message is not important, we could
                     // track retries and put it back on the end of the queue
                     log.error("Overseer could not process the current clusterstate state
update message, skipping the message.", e);
+                    stats.error(operation);
+                  } finally {
+                    timerContext.stop();
                   }
                   zkClient.setData(ZkStateReader.CLUSTER_STATE,
                       ZkStateReader.toJSON(clusterState), true);
@@ -208,8 +234,10 @@ public class Overseer {
             while (head != null) {
               final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
               final String operation = message.getStr(QUEUE_OPERATION);
+              final TimerContext timerContext = stats.time(operation);
               try {
                 clusterState = processMessage(clusterState, message, operation);
+                stats.success(operation);
               } catch (Exception e) {
                 // generally there is nothing we can do - in most cases, we have
                 // an issue that will fail again on retry or we cannot communicate with
@@ -217,6 +245,9 @@ public class Overseer {
                 // TODO: if ordering for the message is not important, we could
                 // track retries and put it back on the end of the queue
                 log.error("Overseer could not process the current clusterstate state update
message, skipping the message.", e);
+                stats.error(operation);
+              } finally {
+                timerContext.stop();
               }
               workQueue.offer(head.getBytes());
 
@@ -253,7 +284,7 @@ public class Overseer {
 
     private ClusterState processMessage(ClusterState clusterState,
         final ZkNodeProps message, final String operation) {
-      if ("state".equals(operation)) {
+      if (STATE.equals(operation)) {
         if( isLegacy( clusterProps )) {
           clusterState = updateState(clusterState, message);
         } else {
@@ -279,9 +310,9 @@ public class Overseer {
             message.getStr(ZkStateReader.SHARD_ID_PROP),
             sb.length() > 0 ? sb.toString() : null);
 
-      } else if ("createshard".equals(operation)) {
+      } else if (CREATESHARD.equals(operation)) {
         clusterState = createShard(clusterState, message);
-      } else if ("updateshardstate".equals(operation))  {
+      } else if (UPDATESHARDSTATE.equals(operation))  {
         clusterState = updateShardState(clusterState, message);
       } else if (OverseerCollectionProcessor.CREATECOLLECTION.equals(operation)) {
          clusterState = buildCollection(clusterState, message);
@@ -1013,7 +1044,7 @@ public class Overseer {
       public boolean isClosed() {
         return this.isClosed;
       }
-    
+
   }
 
   static void getShardNames(Integer numShards, List<String> shardNames) {
@@ -1077,11 +1108,15 @@ public class Overseer {
   private String adminPath;
 
   private OverseerCollectionProcessor ocp;
+
+  private Stats stats;
+
   // overseer not responsible for closing reader
   public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader)
throws KeeperException, InterruptedException {
     this.reader = reader;
     this.shardHandler = shardHandler;
     this.adminPath = adminPath;
+    this.stats = new Stats();
   }
   
   public void start(String id) {
@@ -1090,12 +1125,12 @@ public class Overseer {
     createOverseerNode(reader.getZkClient());
     //launch cluster state updater thread
     ThreadGroup tg = new ThreadGroup("Overseer state updater.");
-    updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id));
+    updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id, stats));
     updaterThread.setDaemon(true);
 
     ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
 
-    ocp = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath);
+    ocp = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats);
     ccThread = new OverseerThread(ccTg, ocp, "Overseer-" + id);
     ccThread.setDaemon(true);
     
@@ -1136,14 +1171,18 @@ public class Overseer {
    * Get queue that can be used to send messages to Overseer.
    */
   public static DistributedQueue getInQueue(final SolrZkClient zkClient) {
+    return getInQueue(zkClient, new Stats());
+  }
+
+  static DistributedQueue getInQueue(final SolrZkClient zkClient, Stats zkStats)  {
     createOverseerNode(zkClient);
-    return new DistributedQueue(zkClient, "/overseer/queue", null);
+    return new DistributedQueue(zkClient, "/overseer/queue", null, zkStats);
   }
 
   /* Internal queue, not to be used outside of Overseer */
-  static DistributedQueue getInternalQueue(final SolrZkClient zkClient) {
+  static DistributedQueue getInternalQueue(final SolrZkClient zkClient, Stats zkStats) {
     createOverseerNode(zkClient);
-    return new DistributedQueue(zkClient, "/overseer/queue-work", null);
+    return new DistributedQueue(zkClient, "/overseer/queue-work", null, zkStats);
   }
 
   /* Internal map for failed tasks, not to be used outside of the Overseer */
@@ -1166,8 +1205,12 @@ public class Overseer {
   
   /* Collection creation queue */
   static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
+    return getCollectionQueue(zkClient, new Stats());
+  }
+
+  static DistributedQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats)
 {
     createOverseerNode(zkClient);
-    return new DistributedQueue(zkClient, "/overseer/collection-queue-work", null);
+    return new DistributedQueue(zkClient, "/overseer/collection-queue-work", null, zkStats);
   }
   
   private static void createOverseerNode(final SolrZkClient zkClient) {
@@ -1192,4 +1235,109 @@ public class Overseer {
     return reader;
   }
 
+  /**
+   * Used to hold statistics about overseer operations. It will be exposed
+   * to the OverseerCollectionProcessor to return statistics.
+   *
+   * This is experimental API and subject to change.
+   */
+  public static class Stats {
+    static final int MAX_STORED_FAILURES = 10;
+
+    final Map<String, Stat> stats = Collections.synchronizedMap(new HashMap<String,
Stat>());
+
+    public Map<String, Stat> getStats() {
+      return stats;
+    }
+
+    public int getSuccessCount(String operation) {
+      Stat stat = stats.get(operation.toLowerCase(Locale.ROOT));
+      return stat == null ? 0 : stat.success.get();
+    }
+
+    public int getErrorCount(String operation)  {
+      Stat stat = stats.get(operation.toLowerCase(Locale.ROOT));
+      return stat == null ? 0 : stat.errors.get();
+    }
+
+    public void success(String operation) {
+      String op = operation.toLowerCase(Locale.ROOT);
+      Stat stat = stats.get(op);
+      if (stat == null) {
+        stat = new Stat();
+        stats.put(op, stat);
+      }
+      stat.success.incrementAndGet();
+    }
+
+    public void error(String operation) {
+      String op = operation.toLowerCase(Locale.ROOT);
+      Stat stat = stats.get(op);
+      if (stat == null) {
+        stat = new Stat();
+        stats.put(op, stat);
+      }
+      stat.errors.incrementAndGet();
+    }
+
+    public TimerContext time(String operation) {
+      String op = operation.toLowerCase(Locale.ROOT);
+      Stat stat = stats.get(op);
+      if (stat == null) {
+        stat = new Stat();
+        stats.put(op, stat);
+      }
+      return stat.requestTime.time();
+    }
+
+    public void storeFailureDetails(String operation, ZkNodeProps request, SolrResponse resp)
{
+      String op = operation.toLowerCase(Locale.ROOT);
+      Stat stat = stats.get(op);
+      if (stat == null) {
+        stat = new Stat();
+        stats.put(op, stat);
+      }
+      LinkedList<FailedOp> failedOps = stat.failureDetails;
+      synchronized (failedOps)  {
+        if (failedOps.size() >= MAX_STORED_FAILURES)  {
+          failedOps.removeFirst();
+        }
+        failedOps.addLast(new FailedOp(request, resp));
+      }
+    }
+
+    public List<FailedOp> getFailureDetails(String operation) {
+      Stat stat = stats.get(operation.toLowerCase(Locale.ROOT));
+      if (stat == null || stat.failureDetails.isEmpty()) return null;
+      LinkedList<FailedOp> failedOps = stat.failureDetails;
+      synchronized (failedOps)  {
+        ArrayList<FailedOp> ret = new ArrayList<>(failedOps);
+        return ret;
+      }
+    }
+  }
+
+  public static class Stat  {
+    public final AtomicInteger success;
+    public final AtomicInteger errors;
+    public final Timer requestTime;
+    public final LinkedList<FailedOp> failureDetails;
+
+    public Stat() {
+      this.success = new AtomicInteger();
+      this.errors = new AtomicInteger();
+      this.requestTime = new Timer(TimeUnit.MILLISECONDS, TimeUnit.MINUTES, Clock.defaultClock());
+      this.failureDetails = new LinkedList<>();
+    }
+  }
+
+  public static class FailedOp  {
+    public final ZkNodeProps req;
+    public final SolrResponse resp;
+
+    public FailedOp(ZkNodeProps req, SolrResponse resp) {
+      this.req = req;
+      this.resp = resp;
+    }
+  }
 }

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1580466&r1=1580465&r2=1580466&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
(original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
Sun Mar 23 07:32:20 2014
@@ -60,6 +60,9 @@ import org.apache.solr.handler.component
 import org.apache.solr.handler.component.ShardRequest;
 import org.apache.solr.handler.component.ShardResponse;
 import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.stats.Snapshot;
+import org.apache.solr.util.stats.Timer;
+import org.apache.solr.util.stats.TimerContext;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -90,6 +93,7 @@ import static org.apache.solr.common.clo
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
 
 
@@ -167,15 +171,18 @@ public class OverseerCollectionProcessor
   private ZkStateReader zkStateReader;
 
   private boolean isClosed;
-  
-  public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler
shardHandler, String adminPath) {
-    this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()),
+
+  private Overseer.Stats stats;
+
+  public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler
shardHandler, String adminPath, Overseer.Stats stats) {
+    this(zkStateReader, myId, shardHandler, adminPath, stats, Overseer.getCollectionQueue(zkStateReader.getZkClient(),
stats),
         Overseer.getRunningMap(zkStateReader.getZkClient()),
         Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient()));
   }
 
   protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler
shardHandler,
                                         String adminPath,
+                                        Overseer.Stats stats,
                                         DistributedQueue workQueue,
                                         DistributedMap runningMap,
                                         DistributedMap completedMap,
@@ -188,6 +195,7 @@ public class OverseerCollectionProcessor
     this.runningMap = runningMap;
     this.completedMap = completedMap;
     this.failureMap = failureMap;
+    this.stats = stats;
   }
   
   @Override
@@ -232,7 +240,13 @@ public class OverseerCollectionProcessor
 
            log.info("Overseer Collection Processor: Get the message id:" + head.getId() +
" message:" + message.toString());
            final String operation = message.getStr(QUEUE_OPERATION);
-           SolrResponse response = processMessage(message, operation);
+           final TimerContext timerContext = stats.time("collection_" + operation); // even
if operation is async, it is sync!
+           SolrResponse response = null;
+           try  {
+             response = processMessage(message, operation);
+           } finally {
+             timerContext.stop();
+           }
 
            head.setBytes(SolrResponse.serializable(response));
            if (!operation.equals(REQUESTSTATUS) && asyncId != null) {
@@ -247,6 +261,13 @@ public class OverseerCollectionProcessor
 
            workQueue.remove(head);
 
+           if (response.getResponse().get("failure") != null || response.getResponse().get("exception")
!= null)  {
+             stats.error("collection_" + operation);
+             stats.storeFailureDetails("collection_" + operation, message, response);
+           } else {
+             stats.success("collection_" + operation);
+           }
+
           log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete,
response:"+ response.getResponse().toString());
         } catch (KeeperException e) {
           if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -456,7 +477,11 @@ public class OverseerCollectionProcessor
         addReplica(zkStateReader.getClusterState(), message, results);
       } else if (REQUESTSTATUS.equals(operation)) {
         requestStatus(message, results);
-      } else {
+      } else if (OVERSEERSTATUS.isEqual(operation)) {
+        getOverseerStatus(message, results);
+      }
+
+      else {
         throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
             + operation);
       }
@@ -474,6 +499,79 @@ public class OverseerCollectionProcessor
     return new OverseerSolrResponse(results);
   }
 
+  private void getOverseerStatus(ZkNodeProps message, NamedList results) throws KeeperException,
InterruptedException {
+    String leaderNode = getLeaderNode(zkStateReader.getZkClient());
+    results.add("leader", leaderNode);
+    Stat stat = new Stat();
+    zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
+    results.add("overseer_queue_size", stat.getNumChildren());
+    stat = new Stat();
+    zkStateReader.getZkClient().getData("/overseer/queue-work",null, stat, true);
+    results.add("overseer_work_queue_size", stat.getNumChildren());
+    stat = new Stat();
+    zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true);
+    results.add("overseer_collection_queue_size", stat.getNumChildren());
+
+    NamedList overseerStats = new NamedList();
+    NamedList collectionStats = new NamedList();
+    NamedList stateUpdateQueueStats = new NamedList();
+    NamedList workQueueStats = new NamedList();
+    NamedList collectionQueueStats = new NamedList();
+    for (Map.Entry<String, Overseer.Stat> entry : this.stats.getStats().entrySet())
{
+      String key = entry.getKey();
+      NamedList<Object> lst = new SimpleOrderedMap<>();
+      if (key.startsWith("collection_"))  {
+        collectionStats.add(key.substring(11), lst);
+        int successes = this.stats.getSuccessCount(entry.getKey());
+        int errors = this.stats.getErrorCount(entry.getKey());
+        lst.add("requests", successes);
+        lst.add("errors", errors);
+        List<Overseer.FailedOp> failureDetails = this.stats.getFailureDetails(key);
+        if (failureDetails != null) {
+          List<SimpleOrderedMap<Object>> failures = new ArrayList<>();
+          for (Overseer.FailedOp failedOp : failureDetails) {
+            SimpleOrderedMap<Object> fail = new SimpleOrderedMap<>();
+            fail.add("request", failedOp.req.getProperties());
+            fail.add("response", failedOp.resp.getResponse());
+            failures.add(fail);
+          }
+          lst.add("recent_failures", failures);
+        }
+      } else if (key.startsWith("/overseer/queue_"))  {
+        stateUpdateQueueStats.add(key.substring(16), lst);
+      } else if (key.startsWith("/overseer/queue-work_"))  {
+        workQueueStats.add(key.substring(21), lst);
+      } else if (key.startsWith("/overseer/collection-queue-work_"))  {
+        collectionQueueStats.add(key.substring(32), lst);
+      } else  {
+        // overseer stats
+        overseerStats.add(key, lst);
+        int successes = this.stats.getSuccessCount(entry.getKey());
+        int errors = this.stats.getErrorCount(entry.getKey());
+        lst.add("requests", successes);
+        lst.add("errors", errors);
+      }
+      Timer timer = entry.getValue().requestTime;
+      Snapshot snapshot = timer.getSnapshot();
+      lst.add("totalTime", timer.getSum());
+      lst.add("avgRequestsPerMinute", timer.getMeanRate());
+      lst.add("5minRateReqsPerMinute", timer.getFiveMinuteRate());
+      lst.add("15minRateReqsPerMinute", timer.getFifteenMinuteRate());
+      lst.add("avgTimePerRequest", timer.getMean());
+      lst.add("medianRequestTime", snapshot.getMedian());
+      lst.add("75thPcRequestTime", snapshot.get75thPercentile());
+      lst.add("95thPcRequestTime", snapshot.get95thPercentile());
+      lst.add("99thPcRequestTime", snapshot.get99thPercentile());
+      lst.add("999thPcRequestTime", snapshot.get999thPercentile());
+    }
+    results.add("overseer_operations", overseerStats);
+    results.add("collection_operations", collectionStats);
+    results.add("overseer_queue", stateUpdateQueueStats);
+    results.add("overseer_internal_queue", workQueueStats);
+    results.add("collection_queue", collectionQueueStats);
+
+  }
+
   private void processRoleCommand(ZkNodeProps message, String operation) throws KeeperException,
InterruptedException {
     SolrZkClient zkClient = zkStateReader.getZkClient();
     Map roles = null;

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1580466&r1=1580465&r2=1580466&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
(original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
Sun Mar 23 07:32:20 2014
@@ -34,6 +34,7 @@ import static org.apache.solr.common.clo
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
 
 import java.io.IOException;
@@ -206,6 +207,10 @@ public class CollectionsHandler extends 
         this.handleRequestStatus(req, rsp);
         break;
       }
+      case OVERSEERSTATUS:  {
+        this.handleOverseerStatus(req, rsp);
+        break;
+      }
       default: {
           throw new RuntimeException("Unknown action: " + action);
       }
@@ -214,6 +219,12 @@ public class CollectionsHandler extends 
     rsp.setHttpCaching(false);
   }
 
+  private void handleOverseerStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException,
InterruptedException {
+    Map<String, Object> props = ZkNodeProps.makeMap(
+        Overseer.QUEUE_OPERATION, OVERSEERSTATUS.toLower());
+    handleResponse(OVERSEERSTATUS.toLower(), new ZkNodeProps(props), rsp);
+  }
+
   private void handleProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException,
InterruptedException {
     req.getParams().required().check("name");
     String name = req.getParams().get("name");

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java?rev=1580466&r1=1580465&r2=1580466&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
(original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
Sun Mar 23 07:32:20 2014
@@ -96,7 +96,7 @@ public class OverseerCollectionProcessor
         DistributedQueue workQueue, DistributedMap runningMap,
         DistributedMap completedMap,
         DistributedMap failureMap) {
-      super(zkStateReader, myId, shardHandler, adminPath, workQueue, runningMap, completedMap,
failureMap);
+      super(zkStateReader, myId, shardHandler, adminPath, new Overseer.Stats(), workQueue,
runningMap, completedMap, failureMap);
     }
     
     @Override

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1580466&r1=1580465&r2=1580466&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
(original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
Sun Mar 23 07:32:20 2014
@@ -43,6 +43,7 @@ import org.apache.solr.common.cloud.Slic
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.handler.admin.CollectionsHandler;
 import org.apache.solr.handler.component.HttpShardHandlerFactory;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.zookeeper.CreateMode;
@@ -912,7 +913,7 @@ public class OverseerTest extends SolrTe
       reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
       //prepopulate work queue with some items to emulate previous overseer died before persisting
state
-      DistributedQueue queue = Overseer.getInternalQueue(zkClient);
+      DistributedQueue queue = Overseer.getInternalQueue(zkClient, new Overseer.Stats());
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
           ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
           ZkStateReader.NODE_NAME_PROP, "node1",

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1580466&r1=1580465&r2=1580466&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
(original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
Sun Mar 23 07:32:20 2014
@@ -43,7 +43,8 @@ public interface CollectionParams 
     REMOVEROLE,
     CLUSTERPROP,
     REQUESTSTATUS,
-    ADDREPLICA;
+    ADDREPLICA,
+    OVERSEERSTATUS;
     
     public static CollectionAction get( String p )
     {



Mime
View raw message