lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From no...@apache.org
Subject lucene-solr:branch_6x: SOLR-8744: Overseer operations performed with fine grained mutual exclusion
Date Thu, 02 Jun 2016 09:25:20 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x 36ee71320 -> 734dcb99f


SOLR-8744: Overseer operations performed with fine grained mutual exclusion


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/734dcb99
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/734dcb99
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/734dcb99

Branch: refs/heads/branch_6x
Commit: 734dcb99fcdd557ef046166193ae804824023db3
Parents: 36ee713
Author: Noble Paul <noble@apache.org>
Authored: Thu Jun 2 14:47:57 2016 +0530
Committer: Noble Paul <noble@apache.org>
Committed: Thu Jun 2 14:49:06 2016 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../java/org/apache/solr/cloud/LockTree.java    | 182 +++++++++++++++++++
 .../OverseerCollectionConfigSetProcessor.java   |  17 +-
 .../cloud/OverseerCollectionMessageHandler.java |  55 +++++-
 .../cloud/OverseerConfigSetMessageHandler.java  |  35 ++--
 .../solr/cloud/OverseerMessageHandler.java      |  33 +---
 .../solr/cloud/OverseerTaskProcessor.java       |  88 ++++-----
 .../apache/solr/cloud/MultiThreadedOCPTest.java |  35 ++--
 .../org/apache/solr/cloud/TestLockTree.java     | 130 +++++++++++++
 .../solr/common/params/CollectionParams.java    | 127 ++++++++-----
 .../org/apache/solr/common/util/StrUtils.java   |   3 +-
 11 files changed, 536 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/734dcb99/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 2d27e07..e7a8382 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -244,6 +244,8 @@ Optimizations
 
 * SOLR-9147: Upgrade commons-io to 2.5, avoid expensive array resizing in EmbeddedSolrServer
(Mikhail Khludnev)
 
+* SOLR-8744: Overseer operations performed with fine grained mutual exclusion (noble, Scott
Blum)
+
 Other Changes
 ----------------------
 * SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage
policy.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/734dcb99/solr/core/src/java/org/apache/solr/cloud/LockTree.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/LockTree.java b/solr/core/src/java/org/apache/solr/cloud/LockTree.java
new file mode 100644
index 0000000..d629d1c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/LockTree.java
@@ -0,0 +1,182 @@
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.cloud.OverseerMessageHandler.Lock;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CollectionParams.LockLevel;
+import org.apache.solr.common.util.StrUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This is a utility class that offers fine grained locking for various Collection Operations
+ * This class is designed for single threaded operation. It's safe for multiple threads to
use it
+ * but internally it is synchronized so that only one thread can perform any operation.
+ */
+public class LockTree {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final Node root = new Node(null, LockLevel.CLUSTER, null);
+
+  public void clear() {
+    synchronized (this) {
+      root.clear();
+    }
+  }
+
+  private class LockImpl implements Lock {
+    final Node node;
+
+    LockImpl( Node node) {
+      this.node = node;
+    }
+
+    @Override
+    public void unlock() {
+      synchronized (LockTree.this) {
+        node.unlock(this);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return StrUtils.join(node.constructPath(new LinkedList<>()), '/');
+    }
+  }
+
+
+  public class Session {
+    private SessionNode root = new SessionNode(LockLevel.CLUSTER);
+
+    public Lock lock(CollectionParams.CollectionAction action, List<String> path) {
+      synchronized (LockTree.this) {
+        if (action.lockLevel == LockLevel.NONE) return FREELOCK;
+        if (root.isBusy(action.lockLevel, path)) return null;
+        Lock lockObject = LockTree.this.root.lock(action.lockLevel, path);
+        if (lockObject == null) root.markBusy(path, 0);
+        return lockObject;
+      }
+    }
+  }
+
+  private static class SessionNode {
+    final LockLevel level;
+    Map<String, SessionNode> kids;
+    boolean busy = false;
+
+    SessionNode(LockLevel level) {
+      this.level = level;
+    }
+
+    void markBusy(List<String> path, int depth) {
+      if (path.size() == depth) {
+        busy = true;
+      } else {
+        String s = path.get(depth);
+        if (kids == null) kids = new HashMap<>();
+        SessionNode node = kids.get(s);
+        if (node == null) kids.put(s, node = new SessionNode(level.getChild()));
+        node.markBusy(path, depth + 1);
+      }
+    }
+
+    boolean isBusy(LockLevel lockLevel, List<String> path) {
+      if (lockLevel.isHigherOrEqual(level)) {
+        if (busy) return true;
+        String s = path.get(level.level);
+        if (kids == null || kids.get(s) == null) return false;
+        return kids.get(s).isBusy(lockLevel, path);
+      } else {
+        return false;
+      }
+    }
+  }
+
+  public Session getSession() {
+    return new Session();
+  }
+
+  private class Node {
+    final String name;
+    final Node mom;
+    final LockLevel level;
+    HashMap<String, Node> children = new HashMap<>();
+    LockImpl myLock;
+
+    Node(String name, LockLevel level, Node mom) {
+      this.name = name;
+      this.level = level;
+      this.mom = mom;
+    }
+
+    //if this or any of its children are locked
+    boolean isLocked() {
+      if (myLock != null) return true;
+      for (Node node : children.values()) if (node.isLocked()) return true;
+      return false;
+    }
+
+
+    void unlock(LockImpl lockObject) {
+      if (myLock == lockObject) myLock = null;
+      else {
+        LOG.info("Unlocked multiple times : {}", lockObject.toString());
+      }
+    }
+
+
+    Lock lock(LockLevel lockLevel, List<String> path) {
+      if (myLock != null) return null;//I'm already locked. no need to go any further
+      if (lockLevel == level) {
+        //lock is supposed to be acquired at this level
+        //If I am locked or any of my children or grandchildren are locked
+        // it is not possible to acquire a lock
+        if (isLocked()) return null;
+        return myLock = new LockImpl(this);
+      } else {
+        String childName = path.get(level.level);
+        Node child = children.get(childName);
+        if (child == null)
+          children.put(childName, child = new Node(childName, LockLevel.getLevel(level.level
+ 1), this));
+        return child.lock(lockLevel, path);
+      }
+    }
+
+    LinkedList<String> constructPath(LinkedList<String> collect) {
+      if (name != null) collect.addFirst(name);
+      if (mom != null) mom.constructPath(collect);
+      return collect;
+    }
+
+    void clear() {
+      if (myLock != null) {
+        LOG.warn("lock_is_leaked at" + constructPath(new LinkedList<>()));
+        myLock = null;
+      }
+      for (Node node : children.values()) node.clear();
+    }
+  }
+  static final Lock FREELOCK = () -> {};
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/734dcb99/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
index f1d0ab2..f8f8446 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
@@ -16,10 +16,10 @@
  */
 package org.apache.solr.cloud;
 
-import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardHandlerFactory;
+
 import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX;
 
 /**
@@ -61,8 +61,6 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
     super(
         zkStateReader,
         myId,
-        shardHandlerFactory,
-        adminPath,
         stats,
         getOverseerMessageHandlerSelector(zkStateReader, myId, shardHandlerFactory,
             adminPath, stats, overseer, overseerNodePrioritizer),
@@ -85,15 +83,12 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
         zkStateReader, myId, shardHandlerFactory, adminPath, stats, overseer, overseerNodePrioritizer);
     final OverseerConfigSetMessageHandler configMessageHandler = new OverseerConfigSetMessageHandler(
         zkStateReader);
-    return new OverseerMessageHandlerSelector() {
-      @Override
-      public OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message) {
-        String operation = message.getStr(Overseer.QUEUE_OPERATION);
-        if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX))
{
-          return configMessageHandler;
-        }
-        return collMessageHandler;
+    return message -> {
+      String operation = message.getStr(Overseer.QUEUE_OPERATION);
+      if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
+        return configMessageHandler;
       }
+      return collMessageHandler;
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/734dcb99/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index ed23e77..54c0697 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -179,7 +179,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
 
   // Set that tracks collections that are currently being processed by a running task.
   // This is used for handling mutual exclusion of the tasks.
-  final private Set collectionWip;
+
+  final private LockTree lockTree = new LockTree();
 
   static final Random RANDOM;
   static {
@@ -206,7 +207,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     this.stats = stats;
     this.overseer = overseer;
     this.overseerPrioritizer = overseerPrioritizer;
-    this.collectionWip = new HashSet();
   }
 
   @Override
@@ -216,10 +216,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
 
     NamedList results = new NamedList();
     try {
-      CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation);
-      if (action == null) {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
-      }
+      CollectionParams.CollectionAction action = getCollectionAction(operation);
       switch (action) {
         case CREATE:
           createCollection(zkStateReader.getClusterState(), message, results);
@@ -287,6 +284,13 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
         case RESTORE:
           processRestoreAction(message, results);
           break;
+        case MOCK_COLL_TASK:
+        case MOCK_SHARD_TASK:
+        case MOCK_REPLICA_TASK: {
+          //only for test purposes
+          Thread.sleep(message.getInt("sleep", 1));
+          break;
+        }
         default:
           throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
               + operation);
@@ -311,6 +315,14 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     return new OverseerSolrResponse(results);
   }
 
+  private CollectionParams.CollectionAction getCollectionAction(String operation) {
+    CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation);
+    if (action == null) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
+    }
+    return action;
+  }
+
   //
   // TODO DWS: this class has gone out of control (too big); refactor to break it up
   //
@@ -2663,7 +2675,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
       message.getStr(COLLECTION_PROP) : message.getStr(NAME);
   }
 
-  @Override
+
+ /* @Override
   public void markExclusiveTask(String collectionName, ZkNodeProps message) {
     if (collectionName != null) {
       synchronized (collectionWip) {
@@ -2679,8 +2692,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
         collectionWip.remove(collectionName);
       }
     }
-  }
-
+  }*/
+/*
   @Override
   public ExclusiveMarking checkExclusiveMarking(String collectionName, ZkNodeProps message)
{
     synchronized (collectionWip) {
@@ -2689,5 +2702,29 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     }
 
     return ExclusiveMarking.NOTDETERMINED;
+  }*/
+
+  private long sessionId = -1;
+  private LockTree.Session lockSession;
+
+  @Override
+  public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) {
+    if (lockSession == null || sessionId != taskBatch.getId()) {
+      //this is always called in the same thread.
+      //Each batch is supposed to have a new taskBatch
+      //So if taskBatch changes we must create a new Session
+      // also check if the running tasks are empty. If yes, clear lockTree
+      // this will ensure that locks are not 'leaked'
+      if(taskBatch.getRunningTasks() == 0) lockTree.clear();
+      lockSession = lockTree.getSession();
+    }
+    return lockSession.lock(getCollectionAction(message.getStr(Overseer.QUEUE_OPERATION)),
+        Arrays.asList(
+            getTaskKey(message),
+            message.getStr(ZkStateReader.SHARD_ID_PROP),
+            message.getStr(ZkStateReader.REPLICA_PROP))
+
+    );
   }
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/734dcb99/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
b/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
index ba8f129..2f2859f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
@@ -45,8 +45,6 @@ import org.noggit.JSONUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.cloud.OverseerMessageHandler.ExclusiveMarking.NONEXCLUSIVE;
-import static org.apache.solr.cloud.OverseerMessageHandler.ExclusiveMarking.NOTDETERMINED;
 import static org.apache.solr.common.params.CommonParams.NAME;
 import static org.apache.solr.common.params.ConfigSetParams.ConfigSetAction.CREATE;
 
@@ -147,12 +145,22 @@ public class OverseerConfigSetMessageHandler implements OverseerMessageHandler
{
   }
 
   @Override
+  public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) {
+    String configSetName = getTaskKey(message);
+    if (canExecute(configSetName, message)) {
+      markExclusiveTask(configSetName, message);
+      return () -> unmarkExclusiveTask(configSetName, message);
+    }
+    return null;
+  }
+
+  @Override
   public String getTaskKey(ZkNodeProps message) {
     return message.getStr(NAME);
   }
 
-  @Override
-  public void markExclusiveTask(String configSetName, ZkNodeProps message) {
+
+  private void markExclusiveTask(String configSetName, ZkNodeProps message) {
     String baseConfigSet = getBaseConfigSetIfCreate(message);
     markExclusive(configSetName, baseConfigSet);
   }
@@ -164,8 +172,7 @@ public class OverseerConfigSetMessageHandler implements OverseerMessageHandler
{
     }
   }
 
-  @Override
-  public void unmarkExclusiveTask(String configSetName, String operation, ZkNodeProps message)
{
+  private void unmarkExclusiveTask(String configSetName, ZkNodeProps message) {
     String baseConfigSet = getBaseConfigSetIfCreate(message);
     unmarkExclusiveConfigSet(configSetName, baseConfigSet);
   }
@@ -177,28 +184,26 @@ public class OverseerConfigSetMessageHandler implements OverseerMessageHandler
{
     }
   }
 
-  @Override
-  public ExclusiveMarking checkExclusiveMarking(String configSetName, ZkNodeProps message)
{
-    String baseConfigSet = getBaseConfigSetIfCreate(message);
-    return checkExclusiveMarking(configSetName, baseConfigSet);
-  }
 
-  private ExclusiveMarking checkExclusiveMarking(String configSetName, String baseConfigSetName)
{
+  private boolean canExecute(String configSetName, ZkNodeProps message) {
+    String baseConfigSetName = getBaseConfigSetIfCreate(message);
+
     synchronized (configSetWriteWip) {
       // need to acquire:
       // 1) write lock on ConfigSet
       // 2) read lock on Base ConfigSet
       if (configSetWriteWip.contains(configSetName) || configSetReadWip.contains(configSetName))
{
-        return NONEXCLUSIVE;
+        return false;
       }
       if (baseConfigSetName != null && configSetWriteWip.contains(baseConfigSetName))
{
-        return NONEXCLUSIVE;
+        return false;
       }
     }
 
-    return NOTDETERMINED;
+    return true;
   }
 
+
   private String getBaseConfigSetIfCreate(ZkNodeProps message) {
     String operation = message.getStr(Overseer.QUEUE_OPERATION);
     if (operation != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/734dcb99/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
index 2d2408f..c4027cc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
@@ -44,37 +44,20 @@ public interface OverseerMessageHandler {
    */
   String getTimerName(String operation);
 
-  /**
-   * @param message the message being processed
-   *
-   * @return the taskKey for the message for handling task exclusivity
-   */
-  String getTaskKey(ZkNodeProps message);
+  interface Lock {
+    void unlock();
+  }
 
-  /**
-   * @param taskKey the key associated with the task, cached from getTaskKey
-   * @param message the message being processed
-   */
-  void markExclusiveTask(String taskKey, ZkNodeProps message);
-  
-  /**
-   * @param taskKey the key associated with the task
-   * @param operation the operation being processed
-   * @param message the message being processed
+  /**Try to provide an exclusive lock for this particular task
+   * return null if locking is not possible. If locking is not necessary
    */
-  void unmarkExclusiveTask(String taskKey, String operation, ZkNodeProps message);
+  Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch);
 
   /**
-   * @param taskKey the key associated with the task
    * @param message the message being processed
    *
-   * @return the exclusive marking
+   * @return the taskKey for the message for handling task exclusivity
    */
-  ExclusiveMarking checkExclusiveMarking(String taskKey, ZkNodeProps message);
+  String getTaskKey(ZkNodeProps message);
 
-  enum ExclusiveMarking {
-    NOTDETERMINED,    // not enough context, fall back to the processor (i.e. look at running
tasks)
-    EXCLUSIVE,
-    NONEXCLUSIVE
-  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/734dcb99/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 26a90cb..93a7e6f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -81,10 +81,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
 
   private String myId;
 
-  private final ShardHandlerFactory shardHandlerFactory;
-
-  private String adminPath;
-
   private ZkStateReader zkStateReader;
 
   private boolean isClosed;
@@ -102,8 +98,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
   private OverseerNodePrioritizer prioritizer;
 
   public OverseerTaskProcessor(ZkStateReader zkStateReader, String myId,
-                                        final ShardHandlerFactory shardHandlerFactory,
-                                        String adminPath,
                                         Overseer.Stats stats,
                                         OverseerMessageHandlerSelector selector,
                                         OverseerNodePrioritizer prioritizer,
@@ -113,8 +107,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
                                         DistributedMap failureMap) {
     this.zkStateReader = zkStateReader;
     this.myId = myId;
-    this.shardHandlerFactory = shardHandlerFactory;
-    this.adminPath = adminPath;
     this.stats = stats;
     this.selector = selector;
     this.prioritizer = prioritizer;
@@ -206,10 +198,11 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
 
           if (isClosed) break;
 
+          taskBatch.batchId++;
           for (QueueEvent head : heads) {
+            if (runningZKTasks.contains(head.getId())) continue;
             final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
             OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
-            String taskKey = messageHandler.getTaskKey(message);
             final String asyncId = message.getStr(ASYNC);
             if (hasLeftOverItems) {
               if (head.getId().equals(oldestItemInWorkQueue))
@@ -220,27 +213,29 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
                 continue;
               }
             }
-
-            if (!checkExclusivity(messageHandler, message, head.getId())) {
+            String operation = message.getStr(Overseer.QUEUE_OPERATION);
+            OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, taskBatch);
+            if (lock == null) {
               log.debug("Exclusivity check failed for [{}]", message.toString());
               continue;
             }
-
             try {
-              markTaskAsRunning(messageHandler, head, taskKey, asyncId, message);
+              markTaskAsRunning(head, asyncId);
               log.debug("Marked task [{}] as running", head.getId());
             } catch (KeeperException.NodeExistsException e) {
+              lock.unlock();
               // This should never happen
               log.error("Tried to pick up task [{}] when it was already running!", head.getId());
+              continue;
             } catch (InterruptedException e) {
+              lock.unlock();
               log.error("Thread interrupted while trying to pick task for execution.", head.getId());
               Thread.currentThread().interrupt();
+              continue;
             }
-
             log.info(messageHandler.getName() + ": Get the message id:" + head.getId() +
" message:" + message.toString());
-            String operation = message.getStr(Overseer.QUEUE_OPERATION);
             Runner runner = new Runner(messageHandler, message,
-                operation, head);
+                operation, head, lock);
             tpe.execute(runner);
           }
 
@@ -262,31 +257,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
     }
   }
 
-  protected boolean checkExclusivity(OverseerMessageHandler messageHandler, ZkNodeProps message,
String id)
-      throws KeeperException, InterruptedException {
-    String taskKey = messageHandler.getTaskKey(message);
-
-    if (taskKey == null)
-      return true;
-
-    OverseerMessageHandler.ExclusiveMarking marking = messageHandler.checkExclusiveMarking(taskKey,
message);
-    switch (marking) {
-      case NOTDETERMINED:
-        break;
-      case EXCLUSIVE:
-        return true;
-      case NONEXCLUSIVE:
-        return false;
-      default:
-        throw new IllegalArgumentException("Undefined marking: " + marking);
-    }
-
-    if (runningZKTasks.contains(id))
-      return false;
-
-    return true;
-  }
-
   private void cleanUpWorkQueue() throws KeeperException, InterruptedException {
     synchronized (completedTasks) {
       for (String id : completedTasks.keySet()) {
@@ -390,8 +360,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
   }
 
   @SuppressWarnings("unchecked")
-  private void markTaskAsRunning(OverseerMessageHandler messageHandler, QueueEvent head,
String taskKey,
-                                 String asyncId, ZkNodeProps message)
+  private void markTaskAsRunning(QueueEvent head, String asyncId)
       throws KeeperException, InterruptedException {
     synchronized (runningZKTasks) {
       runningZKTasks.add(head.getId());
@@ -401,7 +370,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
       runningTasks.add(head.getId());
     }
 
-    messageHandler.markExclusiveTask(taskKey, message);
+//    messageHandler.markExclusiveTask(taskKey, message);
 
     if (asyncId != null)
       runningMap.put(asyncId, null);
@@ -413,12 +382,14 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
     SolrResponse response;
     QueueEvent head;
     OverseerMessageHandler messageHandler;
-  
-    public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation,
QueueEvent head) {
+    private final OverseerMessageHandler.Lock lock;
+
+    public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation,
QueueEvent head, OverseerMessageHandler.Lock lock) {
       this.message = message;
       this.operation = operation;
       this.head = head;
       this.messageHandler = messageHandler;
+      this.lock = lock;
       response = null;
     }
 
@@ -454,7 +425,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
           log.debug("Completed task:[{}]", head.getId());
         }
 
-        markTaskComplete(messageHandler, head.getId(), asyncId, taskKey, message);
+        markTaskComplete(head.getId(), asyncId);
         log.debug("Marked task [{}] as completed.", head.getId());
         printTrackingMaps();
 
@@ -469,6 +440,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
         log.warn("Resetting task {} as the thread was interrupted.", head.getId());
         Thread.currentThread().interrupt();
       } finally {
+        lock.unlock();
         if (!success) {
           // Reset task from tracking data structures so that it can be retried.
           resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
@@ -479,7 +451,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
       }
     }
 
-    private void markTaskComplete(OverseerMessageHandler messageHandler, String id, String
asyncId, String taskKey, ZkNodeProps message)
+    private void markTaskComplete(String id, String asyncId)
         throws KeeperException, InterruptedException {
       synchronized (completedTasks) {
         completedTasks.put(id, head);
@@ -494,9 +466,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
           log.warn("Could not find and remove async call [" + asyncId + "] from the running
map.");
         }
       }
-
-
-      messageHandler.unmarkExclusiveTask(taskKey, operation, message);
     }
 
     private void resetTaskWithException(OverseerMessageHandler messageHandler, String id,
String asyncId, String taskKey, ZkNodeProps message) {
@@ -512,7 +481,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
           runningTasks.remove(id);
         }
 
-        messageHandler.unmarkExclusiveTask(taskKey, operation, message);
       } catch (KeeperException e) {
         SolrException.log(log, "", e);
       } catch (InterruptedException e) {
@@ -568,4 +536,20 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
     OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message);
   }
 
+  final private TaskBatch taskBatch = new TaskBatch();
+
+  public class TaskBatch {
+    private long batchId = 0;
+
+    public long getId() {
+      return batchId;
+    }
+
+    public int getRunningTasks() {
+      synchronized (runningTasks) {
+        return runningTasks.size();
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/734dcb99/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
index 1195583..c18b330 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
@@ -30,8 +30,10 @@ import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -99,27 +101,32 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase
{
     }
   }
 
-  private void testTaskExclusivity() throws IOException, SolrServerException {
+  private void testTaskExclusivity() throws Exception, SolrServerException {
+
+    DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(),
+        "/overseer/collection-queue-work", new Overseer.Stats());
     try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0))))
{
+
       Create createCollectionRequest = new Create()
               .setCollectionName("ocptest_shardsplit")
               .setNumShards(4)
               .setConfigName("conf1")
               .setAsyncId("1000");
       createCollectionRequest.process(client);
-  
-      SplitShard splitShardRequest = new SplitShard()
-              .setCollectionName("ocptest_shardsplit")
-              .setShardName(SHARD1)
-              .setAsyncId("1001");
-      splitShardRequest.process(client);
-  
-      splitShardRequest = new SplitShard()
-              .setCollectionName("ocptest_shardsplit")
-              .setShardName(SHARD2)
-              .setAsyncId("1002");
-      splitShardRequest.process(client);
-  
+
+      distributedQueue.offer(Utils.toJSON(Utils.makeMap(
+          "collection", "ocptest_shardsplit",
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
+          CommonAdminParams.ASYNC, "1001",
+          "sleep", "100"
+      )));
+      distributedQueue.offer(Utils.toJSON(Utils.makeMap(
+          "collection", "ocptest_shardsplit",
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
+          CommonAdminParams.ASYNC, "1002",
+          "sleep", "100"
+      )));
+
       int iterations = 0;
       while(true) {
         int runningTasks = 0;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/734dcb99/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java b/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java
new file mode 100644
index 0000000..7e4d9b7
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java
@@ -0,0 +1,130 @@
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cloud.OverseerMessageHandler.Lock;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class TestLockTree extends SolrTestCaseJ4 {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+  public void testLocks() throws Exception {
+    LockTree lockTree = new LockTree();
+    Lock coll1Lock = lockTree.getSession().lock(CollectionAction.CREATE,
+        Arrays.asList("coll1"));
+    assertNotNull(coll1Lock);
+    assertNull("Should not be able to lock coll1/shard1", lockTree.getSession().lock(CollectionAction.BALANCESHARDUNIQUE,
+        Arrays.asList("coll1", "shard1")));
+
+    assertNull(lockTree.getSession().lock(ADDREPLICAPROP,
+        Arrays.asList("coll1", "shard1", "core_node2")));
+    coll1Lock.unlock();
+    Lock shard1Lock = lockTree.getSession().lock(CollectionAction.BALANCESHARDUNIQUE,
+        Arrays.asList("coll1", "shard1"));
+    assertNotNull(shard1Lock);
+    shard1Lock.unlock();
+    Lock replica1Lock = lockTree.getSession().lock(ADDREPLICAPROP,
+        Arrays.asList("coll1", "shard1", "core_node2"));
+    assertNotNull(replica1Lock);
+
+
+    List<Pair<CollectionAction, List<String>>> operations = new ArrayList<>();
+    operations.add(new Pair<>(ADDREPLICAPROP, Arrays.asList("coll1", "shard1", "core_node2")));
+    operations.add(new Pair<>(MODIFYCOLLECTION, Arrays.asList("coll1")));
+    operations.add(new Pair<>(SPLITSHARD, Arrays.asList("coll1", "shard1")));
+    operations.add(new Pair<>(SPLITSHARD, Arrays.asList("coll2", "shard2")));
+    operations.add(new Pair<>(MODIFYCOLLECTION, Arrays.asList("coll2")));
+    operations.add(new Pair<>(DELETEREPLICA, Arrays.asList("coll2", "shard1")));
+
+    List<Set<String>> orderOfExecution = Arrays.asList(
+        ImmutableSet.of("coll1/shard1/core_node2", "coll2/shard2"),
+        ImmutableSet.of("coll1", "coll2"),
+        ImmutableSet.of("coll1/shard1", "coll2/shard1"));
+    lockTree = new LockTree();
+    for (int counter = 0; counter < orderOfExecution.size(); counter++) {
+      LockTree.Session session = lockTree.getSession();
+      List<Pair<CollectionAction, List<String>>> completedOps = new CopyOnWriteArrayList<>();
+      List<Lock> locks = new CopyOnWriteArrayList<>();
+      List<Thread> threads = new ArrayList<>();
+      for (int i = 0; i < operations.size(); i++) {
+        Pair<CollectionAction, List<String>> operation = operations.get(i);
+        final Lock lock = session.lock(operation.first(), operation.second());
+        if (lock != null) {
+          Thread thread = new Thread(getRunnable(completedOps, operation, locks, lock));
+          threads.add(thread);
+          thread.start();
+        }
+      }
+
+
+      for (Thread thread : threads) thread.join();
+      if (locks.isEmpty())
+        throw new RuntimeException("Could not attain lock for anything " + operations);
+
+      Set<String> expectedOps = orderOfExecution.get(counter);
+      log.info("counter : {} , expected : {}, actual : {}", counter, expectedOps, locks);
+      assertEquals(expectedOps.size(), locks.size());
+      for (Lock lock : locks)
+        assertTrue("locks : " + locks + " expectedOps : " + expectedOps, expectedOps.contains(lock.toString()));
+      locks.clear();
+      for (Pair<CollectionAction, List<String>> completedOp : completedOps) {
+        operations.remove(completedOp);
+      }
+    }
+  }
+
+  private Runnable getRunnable(List<Pair<CollectionAction, List<String>>>
completedOps, Pair<CollectionAction,
+      List<String>> operation, List<Lock> locks, Lock lock) {
+    return () -> {
+      try {
+        Thread.sleep(1);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      } finally {
+        lock.unlock();
+        completedOps.add(operation);
+        locks.add(lock);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/734dcb99/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index cc505f8..42cf372 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -18,65 +18,104 @@ package org.apache.solr.common.params;
 
 import java.util.Locale;
 
-public interface CollectionParams 
-{
-  /** What action **/
-  public final static String ACTION = "action";
-  public final static String NAME = "name";
-  
+public interface CollectionParams {
+  /**
+   * What action
+   **/
+  String ACTION = "action";
+  String NAME = "name";
 
 
-  public enum CollectionAction {
-    CREATE(true),
-    DELETE(true),
-    RELOAD(true),
-    SYNCSHARD(true),
-    CREATEALIAS(true),
-    DELETEALIAS(true),
-    SPLITSHARD(true),
-    DELETESHARD(true),
-    CREATESHARD(true),
-    DELETEREPLICA(true),
-    FORCELEADER(true),
-    MIGRATE(true),
-    ADDROLE(true),
-    REMOVEROLE(true),
-    CLUSTERPROP(true),
-    REQUESTSTATUS(false),
-    DELETESTATUS(false),
-    ADDREPLICA(true),
-    OVERSEERSTATUS(false),
-    LIST(false),
-    CLUSTERSTATUS(false),
-    ADDREPLICAPROP(true),
-    DELETEREPLICAPROP(true),
-    BALANCESHARDUNIQUE(true),
-    REBALANCELEADERS(true),
-    MODIFYCOLLECTION(true),
-    MIGRATESTATEFORMAT(true),
-    BACKUP(true),
-    RESTORE(true);
-    
+  enum LockLevel {
+    CLUSTER(0),
+    COLLECTION(1),
+    SHARD(2),
+    REPLICA(3),
+    NONE(10);
+
+    public final int level;
+
+    LockLevel(int i) {
+      this.level = i;
+    }
+
+    public LockLevel getChild() {
+      return getLevel(level + 1);
+    }
+
+    public static LockLevel getLevel(int i) {
+      for (LockLevel v : values()) {
+        if (v.level == i) return v;
+      }
+      return null;
+    }
+
+    public boolean isHigherOrEqual(LockLevel that) {
+      return that.level <= level;
+    }
+  }
+
+  enum CollectionAction {
+    CREATE(true, LockLevel.COLLECTION),
+    DELETE(true, LockLevel.COLLECTION),
+    RELOAD(true, LockLevel.COLLECTION),
+    SYNCSHARD(true, LockLevel.SHARD),
+    CREATEALIAS(true, LockLevel.COLLECTION),
+    DELETEALIAS(true, LockLevel.COLLECTION),
+    SPLITSHARD(true, LockLevel.SHARD),
+    DELETESHARD(true, LockLevel.SHARD),
+    CREATESHARD(true, LockLevel.COLLECTION),
+    DELETEREPLICA(true, LockLevel.SHARD),
+    FORCELEADER(true, LockLevel.SHARD),
+    MIGRATE(true, LockLevel.SHARD),
+    ADDROLE(true, LockLevel.NONE),
+    REMOVEROLE(true, LockLevel.NONE),
+    CLUSTERPROP(true, LockLevel.NONE),
+    REQUESTSTATUS(false, LockLevel.NONE),
+    DELETESTATUS(false, LockLevel.NONE),
+    ADDREPLICA(true, LockLevel.SHARD),
+    OVERSEERSTATUS(false, LockLevel.NONE),
+    LIST(false, LockLevel.NONE),
+    CLUSTERSTATUS(false, LockLevel.NONE),
+    ADDREPLICAPROP(true, LockLevel.REPLICA),
+    DELETEREPLICAPROP(true, LockLevel.REPLICA),
+    BALANCESHARDUNIQUE(true, LockLevel.SHARD),
+    REBALANCELEADERS(true, LockLevel.COLLECTION),
+    MODIFYCOLLECTION(true, LockLevel.COLLECTION),
+    MIGRATESTATEFORMAT(true, LockLevel.CLUSTER),
+    BACKUP(true, LockLevel.COLLECTION),
+    RESTORE(true, LockLevel.COLLECTION),
+    //only for testing. it just waits for specified time
+    // these are not exposed via collection API commands
+    // but the overseer is aware of these tasks
+    MOCK_COLL_TASK(false, LockLevel.COLLECTION),
+    MOCK_SHARD_TASK(false, LockLevel.SHARD),
+    MOCK_REPLICA_TASK(false, LockLevel.REPLICA)
+    ;
     public final boolean isWrite;
+    public final LockLevel lockLevel;
 
-    CollectionAction(boolean isWrite) {
+    CollectionAction(boolean isWrite, LockLevel level) {
       this.isWrite = isWrite;
+      this.lockLevel = level;
     }
 
     public static CollectionAction get(String p) {
-      if( p != null ) {
+      if (p != null) {
         try {
-          return CollectionAction.valueOf( p.toUpperCase(Locale.ROOT) );
+          return CollectionAction.valueOf(p.toUpperCase(Locale.ROOT));
+        } catch (Exception ex) {
         }
-        catch( Exception ex ) {}
       }
       return null;
     }
-    public boolean isEqual(String s){
-      if(s == null) return false;
+
+    public boolean isEqual(String s) {
+      if (s == null) return false;
       return toString().equals(s.toUpperCase(Locale.ROOT));
     }
-    public String toLower(){
+
+    public String toLower() {
       return toString().toLowerCase(Locale.ROOT);
     }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/734dcb99/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java b/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java
index 5fa0fae..995e142 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java
@@ -149,10 +149,11 @@ public class StrUtils {
    * @see #escapeTextWithSeparator
    */
   public static String join(Collection<?> items, char separator) {
+    if (items == null) return "";
     StringBuilder sb = new StringBuilder(items.size() << 3);
     boolean first=true;
     for (Object o : items) {
-      String item = o.toString();
+      String item = String.valueOf(o);
       if (first) {
         first = false;
       } else {


Mime
View raw message