lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tflo...@apache.org
Subject lucene-solr:master: SOLR-11739: Don't accept duplicate async IDs in collection API operations
Date Fri, 16 Feb 2018 00:17:59 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/master 53640b95d -> 61ea8f60b


SOLR-11739: Don't accept duplicate async IDs in collection API operations


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/61ea8f60
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/61ea8f60
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/61ea8f60

Branch: refs/heads/master
Commit: 61ea8f60b1c69ba9ed753fe533d571fcbb452887
Parents: 53640b9
Author: Tomas Fernandez Lobbe <tflobbe@apache.org>
Authored: Thu Feb 15 15:41:48 2018 -0800
Committer: Tomas Fernandez Lobbe <tflobbe@apache.org>
Committed: Thu Feb 15 16:14:16 2018 -0800

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../org/apache/solr/cloud/DistributedMap.java   |  30 +++-
 .../java/org/apache/solr/cloud/Overseer.java    |  11 +-
 .../apache/solr/cloud/OverseerTaskQueue.java    |   2 +-
 .../solr/cloud/SizeLimitedDistributedMap.java   |  21 ++-
 .../org/apache/solr/cloud/ZkController.java     |  42 +++++
 .../OverseerCollectionMessageHandler.java       |   2 +-
 .../solr/handler/admin/CollectionsHandler.java  |  54 +++++-
 .../solr/handler/admin/RebalanceLeaders.java    |   2 +
 .../apache/solr/cloud/TestDistributedMap.java   | 180 +++++++++++++++++++
 .../cloud/TestSizeLimitedDistributedMap.java    |  71 ++++----
 .../CollectionsAPIAsyncDistributedZkTest.java   |  86 ++++++++-
 .../org/apache/solr/cloud/ZkTestServer.java     |   1 -
 13 files changed, 455 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index d5efe44..b173872 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -208,6 +208,8 @@ Bug Fixes
 
 * SOLR-11950: Allow CLUSTERSTATUS "shard" parameter to accept comma (,) delimited list (Chris
Ulicny via Jason Gerlowski)
 
+* SOLR-11739: Fix race condition that made Solr accept duplicate async IDs in collection
API operations (Tomás Fernánadez Löbbe)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
index 7518208..c9f12e9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
@@ -16,16 +16,18 @@
  */
 package org.apache.solr.cloud;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.data.Stat;
 
-import java.util.List;
-
 /**
  * A distributed map.
  * This supports basic map functions e.g. get, put, contains for interaction with zk which
@@ -58,6 +60,19 @@ public class DistributedMap {
   public void put(String trackingId, byte[] data) throws KeeperException, InterruptedException
{
     zookeeper.makePath(dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null,
false, true);
   }
+  
+  /**
+   * Puts an element in the map only if there isn't one with the same trackingId already
+   * @return True if the the element was added. False if it wasn't (because the key already
exists)
+   */
+  public boolean putIfAbsent(String trackingId, byte[] data) throws KeeperException, InterruptedException
{
+    try {
+      zookeeper.makePath(dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null,
true, true);
+      return true;
+    } catch (NodeExistsException e) {
+      return false;
+    }
+  }
 
   public byte[] get(String trackingId) throws KeeperException, InterruptedException {
     return zookeeper.getData(dir + "/" + PREFIX + trackingId, null, null, true);
@@ -97,5 +112,16 @@ public class DistributedMap {
     }
 
   }
+  
+  /**
+   * Returns the keys of all the elements in the map
+   */
+  public Collection<String> keys() throws KeeperException, InterruptedException {
+    List<String> childs = zookeeper.getChildren(dir, null, true);
+    final List<String> ids = new ArrayList<>(childs.size());
+    childs.stream().forEach((child) -> ids.add(child.substring(PREFIX.length())));
+    return ids;
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index edf3838..dbadcde 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -679,13 +679,19 @@ public class Overseer implements SolrCloseable {
   /* Size-limited map for successfully completed tasks*/
   static DistributedMap getCompletedMap(final SolrZkClient zkClient) {
     createOverseerNode(zkClient);
-    return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed",
NUM_RESPONSES_TO_STORE);
+    return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed",
NUM_RESPONSES_TO_STORE, (child) -> getAsyncIdsMap(zkClient).remove(child));
   }
 
   /* Map for failed tasks, not to be used outside of the Overseer */
   static DistributedMap getFailureMap(final SolrZkClient zkClient) {
     createOverseerNode(zkClient);
-    return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", NUM_RESPONSES_TO_STORE);
+    return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", NUM_RESPONSES_TO_STORE,
(child) -> getAsyncIdsMap(zkClient).remove(child));
+  }
+  
+  /* Map of async IDs currently in use*/
+  static DistributedMap getAsyncIdsMap(final SolrZkClient zkClient) {
+    createOverseerNode(zkClient);
+    return new DistributedMap(zkClient, "/overseer/async_ids");
   }
 
   /**
@@ -770,6 +776,7 @@ public class Overseer implements SolrCloseable {
     createOverseerNode(zkClient);
     return getCollectionQueue(zkClient, zkStats);
   }
+  
 
   private static void createOverseerNode(final SolrZkClient zkClient) {
     try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 2767258..3df6501 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -70,7 +70,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
           if (data != null) {
             ZkNodeProps message = ZkNodeProps.load(data);
             if (message.containsKey(requestIdKey)) {
-              LOG.debug(">>>> {}", message.get(requestIdKey));
+              LOG.debug("Looking for {}, found {}", message.get(requestIdKey), requestId);
               if(message.get(requestIdKey).equals(requestId)) return true;
             }
           }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
index 6501b8c..7f7e75f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
@@ -17,7 +17,6 @@
 package org.apache.solr.cloud;
 
 import java.util.List;
-
 import org.apache.lucene.util.PriorityQueue;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.zookeeper.KeeperException;
@@ -34,9 +33,19 @@ public class SizeLimitedDistributedMap extends DistributedMap {
 
   private final int maxSize;
 
+  /**
+   * This observer will be called when this map overflows, and deletes the excess of elements
+   */
+  private final OnOverflowObserver onOverflowObserver;
+
   public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, int maxSize) {
+    this(zookeeper, dir, maxSize, null);
+  }
+  
+  public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, int maxSize, OnOverflowObserver
onOverflowObserver) {
     super(zookeeper, dir);
     this.maxSize = maxSize;
+    this.onOverflowObserver = onOverflowObserver;
   }
 
   @Override
@@ -47,7 +56,7 @@ public class SizeLimitedDistributedMap extends DistributedMap {
 
       int cleanupSize = maxSize / 10;
 
-      final PriorityQueue priorityQueue = new PriorityQueue<Long>(cleanupSize) {
+      final PriorityQueue<Long> priorityQueue = new PriorityQueue<Long>(cleanupSize)
{
         @Override
         protected boolean lessThan(Long a, Long b) {
           return (a > b);
@@ -63,11 +72,17 @@ public class SizeLimitedDistributedMap extends DistributedMap {
 
       for (String child : children) {
         Stat stat = zookeeper.exists(dir + "/" + child, null, true);
-        if (stat.getMzxid() <= topElementMzxId)
+        if (stat.getMzxid() <= topElementMzxId) {
           zookeeper.delete(dir + "/" + child, -1, true);
+          if (onOverflowObserver != null) onOverflowObserver.onChildDelete(child.substring(PREFIX.length()));
+        }
       }
     }
 
     super.put(trackingId, data);
   }
+  
+  interface OnOverflowObserver {
+    void onChildDelete(String child) throws KeeperException, InterruptedException;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 7898e96..cb1fcea 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -149,6 +149,7 @@ public class ZkController {
   private final DistributedMap overseerRunningMap;
   private final DistributedMap overseerCompletedMap;
   private final DistributedMap overseerFailureMap;
+  private final DistributedMap asyncIdsMap;
 
   public final static String COLLECTION_PARAM_PREFIX = "collection.";
   public final static String CONFIGNAME_PROP = "configName";
@@ -436,6 +437,8 @@ public class ZkController {
     this.overseerRunningMap = Overseer.getRunningMap(zkClient);
     this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
     this.overseerFailureMap = Overseer.getFailureMap(zkClient);
+    this.asyncIdsMap = Overseer.getAsyncIdsMap(zkClient);
+
     zkStateReader = new ZkStateReader(zkClient, () -> {
       if (cc != null) cc.securityNodeChanged();
     });
@@ -1930,6 +1933,45 @@ public class ZkController {
     return overseerFailureMap;
   }
 
+  /**
+   * When an operation needs to be performed in an asynchronous mode, the asyncId needs
+   * to be claimed by calling this method to make sure it's not duplicate (hasn't been
+   * claimed by other request). If this method returns true, the asyncId in the parameter
+   * has been reserved for the operation, meaning that no other thread/operation can claim
+   * it. If for whatever reason, the operation is not scheduled, the asuncId needs to be
+   * cleared using {@link #clearAsyncId(String)}.
+   * If this method returns false, no reservation has been made, and this asyncId can't 
+   * be used, since it's being used by another operation (currently or in the past)
+   * @param asyncId A string representing the asyncId of an operation. Can't be null.
+   * @return True if the reservation succeeds.
+   *         False if this ID is already in use.
+   */
+  public boolean claimAsyncId(String asyncId) throws KeeperException {
+    try {
+      return asyncIdsMap.putIfAbsent(asyncId, new byte[0]);
+    } catch (InterruptedException e) {
+      log.error("Could not claim asyncId=" + asyncId, e);
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+  
+  /**
+   * Clears an asyncId previously claimed by calling {@link #claimAsyncId(String)}
+   * @param asyncId A string representing the asyncId of an operation. Can't be null.
+   * @return True if the asyncId existed and was cleared.
+   *         False if the asyncId didn't exist before.
+   */
+  public boolean clearAsyncId(String asyncId) throws KeeperException {
+    try {
+      return asyncIdsMap.remove(asyncId);
+    } catch (InterruptedException e) {
+      log.error("Could not release asyncId=" + asyncId, e);
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+
   public int getClientTimeout() {
     return clientTimeout;
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 6519a8e..2143d1e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -810,7 +810,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
    */
   List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
                      NamedList results, Replica.State stateMatcher, String asyncId, Map<String,
String> requestMap, Set<String> okayExceptions) {
-    log.info("Executing Collection Cmd : " + params);
+    log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
     String collectionName = message.getStr(NAME);
     ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 63b9f16..4933559 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -279,6 +279,14 @@ public class CollectionsHandler extends RequestHandlerBase implements
Permission
 
   static final Set<String> KNOWN_ROLES = ImmutableSet.of("overseer");
 
+  /*
+   * In SOLR-11739 we change the way the async IDs are checked to decide if one has
+   * already been used or not. For backward compatibility, we continue to check in the
+   * old way (meaning, in all the queues) for now. This extra check should be removed
+   * in Solr 9 
+   */
+  private static final boolean CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS = true;
+
   public static long DEFAULT_COLLECTION_OP_TIMEOUT = 180*1000;
 
   public SolrResponse sendToOCPQueue(ZkNodeProps m) throws KeeperException, InterruptedException
{
@@ -294,21 +302,40 @@ public class CollectionsHandler extends RequestHandlerBase implements
Permission
 
        String asyncId = m.getStr(ASYNC);
 
-       if(asyncId.equals("-1")) {
+       if (asyncId.equals("-1")) {
          throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved
for cleanup purposes.");
        }
 
        NamedList<String> r = new NamedList<>();
-
-       if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
+       
+       if (CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS && (
+           coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
            coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
            coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
-           overseerCollectionQueueContains(asyncId)) {
+           overseerCollectionQueueContains(asyncId))) {
+         // for back compatibility, check in the old places. This can be removed in Solr
9
          r.add("error", "Task with the same requestid already exists.");
-
        } else {
-         coreContainer.getZkController().getOverseerCollectionQueue()
+         if (coreContainer.getZkController().claimAsyncId(asyncId)) {
+           boolean success = false;
+           try {
+             coreContainer.getZkController().getOverseerCollectionQueue()
              .offer(Utils.toJSON(m));
+             success = true;
+           } finally {
+             if (!success) {
+               try {
+                 coreContainer.getZkController().clearAsyncId(asyncId);
+               } catch (Exception e) {
+                 // let the original exception bubble up
+                 log.error("Unable to release async ID={}", asyncId, e);
+                 SolrZkClient.checkInterrupted(e);
+               }
+             }
+           }
+         } else {
+           r.add("error", "Task with the same requestid already exists.");
+         }
        }
        r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
 
@@ -708,18 +735,29 @@ public class CollectionsHandler extends RequestHandlerBase implements
Permission
         }
 
         if (flush) {
-          zkController.getOverseerCompletedMap().clear();
-          zkController.getOverseerFailureMap().clear();
+          Collection<String> completed = zkController.getOverseerCompletedMap().keys();
+          Collection<String> failed = zkController.getOverseerFailureMap().keys();
+          for (String asyncId:completed) {
+            zkController.getOverseerCompletedMap().remove(asyncId);
+            zkController.clearAsyncId(asyncId);
+          }
+          for (String asyncId:failed) {
+            zkController.getOverseerFailureMap().remove(asyncId);
+            zkController.clearAsyncId(asyncId);
+          }
           rsp.getValues().add("status", "successfully cleared stored collection api responses");
           return null;
         } else {
           // Request to cleanup
           if (zkController.getOverseerCompletedMap().remove(requestId)) {
+            zkController.clearAsyncId(requestId);
             rsp.getValues().add("status", "successfully removed stored response for [" +
requestId + "]");
           } else if (zkController.getOverseerFailureMap().remove(requestId)) {
+            zkController.clearAsyncId(requestId);
             rsp.getValues().add("status", "successfully removed stored response for [" +
requestId + "]");
           } else {
             rsp.getValues().add("status", "[" + requestId + "] not found in stored responses");
+            // Don't call zkController.clearAsyncId for this, since it could be a running/pending
task
           }
         }
         return null;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
index 53e9fde..f0819bd 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
@@ -287,6 +287,7 @@ class RebalanceLeaders {
         String asyncId = pair.getKey();
         if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
           coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
+          coreContainer.getZkController().clearAsyncId(asyncId);
           NamedList<Object> fails = (NamedList<Object>) results.get("failures");
           if (fails == null) {
             fails = new NamedList<>();
@@ -300,6 +301,7 @@ class RebalanceLeaders {
           foundChange = true;
         } else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId))
{
           coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
+          coreContainer.getZkController().clearAsyncId(asyncId);
           NamedList<Object> successes = (NamedList<Object>) results.get("successes");
           if (successes == null) {
             successes = new NamedList<>();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/core/src/test/org/apache/solr/cloud/TestDistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestDistributedMap.java b/solr/core/src/test/org/apache/solr/cloud/TestDistributedMap.java
new file mode 100644
index 0000000..ae05dd5
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestDistributedMap.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Path;
+import java.util.Locale;
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestDistributedMap extends SolrTestCaseJ4 {
+  
+  private static Path zkDir;
+  
+  protected static ZkTestServer zkServer;
+  
+  @BeforeClass
+  public static void setUpClass() throws InterruptedException {
+    zkDir = createTempDir("TestDistributedMap");
+    zkServer = new ZkTestServer(zkDir.toFile().getAbsolutePath());
+    zkServer.run();
+  }
+  
+  @AfterClass
+  public static void tearDownClass() throws IOException, InterruptedException {
+    
+    if (zkServer != null) {
+      zkServer.shutdown();
+      zkServer = null;
+    }
+    FileUtils.deleteDirectory(zkDir.toFile());
+    zkDir = null;
+  }
+  
+  public void testPut() throws KeeperException, InterruptedException {
+    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+      String path = getAndMakeInitialPath(zkClient);
+      DistributedMap map = createMap(zkClient, path);
+      assertFalse(zkClient.exists(path + "/" + DistributedMap.PREFIX + "foo", true));
+      map.put("foo", new byte[0]);
+      assertTrue(zkClient.exists(path + "/" + DistributedMap.PREFIX + "foo", true));
+    }
+  }
+  
+  public void testGet() throws KeeperException, InterruptedException {
+    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+      String path = getAndMakeInitialPath(zkClient);
+      byte[] data = "data".getBytes(Charset.defaultCharset());
+      zkClient.makePath(path + "/" + DistributedMap.PREFIX + "foo", data, CreateMode.PERSISTENT,
null, false, true);
+      DistributedMap map = createMap(zkClient, path);
+      assertArrayEquals(data,  map.get("foo"));
+    }
+  }
+  
+  public void testContains() throws KeeperException, InterruptedException {
+    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+      String path = getAndMakeInitialPath(zkClient);
+      DistributedMap map = createMap(zkClient, path);
+      assertFalse(map.contains("foo"));
+      zkClient.makePath(path + "/" + DistributedMap.PREFIX + "foo", new byte[0], CreateMode.PERSISTENT,
null, false, true);
+      assertTrue(map.contains("foo"));
+    }
+  }
+  
+  public void testRemove() throws KeeperException, InterruptedException {
+    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+      String path = getAndMakeInitialPath(zkClient);
+      DistributedMap map = createMap(zkClient, path);
+      assertFalse(map.remove("foo"));
+      zkClient.makePath(path + "/" + DistributedMap.PREFIX + "foo", new byte[0], CreateMode.PERSISTENT,
null, false, true);
+      assertTrue(map.remove("foo"));
+      assertFalse(map.contains("foo"));
+      assertFalse(zkClient.exists(path + "/" + DistributedMap.PREFIX + "foo", true));
+    }
+  }
+  
+  public void testSize() throws KeeperException, InterruptedException {
+    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+      String path = getAndMakeInitialPath(zkClient);
+      DistributedMap map = createMap(zkClient, path);
+      assertEquals(0, map.size());
+      map.remove("bar");
+      assertEquals(0, map.size());
+      map.put("foo", new byte[0]);
+      assertEquals(1, map.size());
+      map.put("foo2", new byte[0]);
+      assertEquals(2, map.size());
+      map.remove("foo");
+      assertEquals(1, map.size());
+    }
+  }
+  
+  public void testPutIfAbsent() throws KeeperException, InterruptedException {
+    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+      String path = getAndMakeInitialPath(zkClient);
+      DistributedMap map = createMap(zkClient, path);
+      assertEquals(0, map.size());
+      assertFalse(map.contains("foo"));
+      assertTrue(map.putIfAbsent("foo", new byte[0]));
+      assertEquals(1, map.size());
+      assertTrue(map.contains("foo"));
+      assertFalse(map.putIfAbsent("foo", new byte[0]));
+      assertTrue(map.contains("foo"));
+      assertEquals(1, map.size());
+      map.remove("foo");
+      assertFalse(map.contains("foo"));
+      assertEquals(0, map.size());
+      assertTrue(map.putIfAbsent("foo", new byte[0]));
+      assertEquals(1, map.size());
+      assertTrue(map.contains("foo"));
+    }
+    
+  }
+  
+  public void testKeys() throws KeeperException, InterruptedException {
+    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+      String path = getAndMakeInitialPath(zkClient);
+      DistributedMap map = createMap(zkClient, path);
+      assertEquals(0, map.keys().size());
+      map.put("foo", new byte[0]);
+      assertTrue(map.keys().contains("foo"));
+      assertEquals(1, map.keys().size());
+      
+      map.put("bar", new byte[0]);
+      assertTrue(map.keys().contains("bar"));
+      assertTrue(map.keys().contains("foo"));
+      assertEquals(2, map.keys().size());
+      
+      map.remove("foo");
+      assertTrue(map.keys().contains("bar"));
+      assertEquals(1, map.keys().size());
+    }
+  }
+  
+  public void testClear() throws KeeperException, InterruptedException {
+    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+      String path = getAndMakeInitialPath(zkClient);
+      DistributedMap map = createMap(zkClient, path);
+      map.clear();
+      assertEquals(0, map.size());
+      map.put("foo", new byte[0]);
+      map.put("bar", new byte[0]);
+      assertEquals(2, map.size());
+      map.clear();
+      assertEquals(0, map.size());
+    }
+  }
+  
+  protected DistributedMap createMap(SolrZkClient zkClient, String path) {
+    return new DistributedMap(zkClient, path);
+  }
+  
+  protected String getAndMakeInitialPath(SolrZkClient zkClient) throws KeeperException, InterruptedException
{
+    String path = String.format(Locale.ROOT, "/%s/%s", getClass().getName(), getTestName());
+    zkClient.makePath(path, false, true);
+    return path;
+  }
+  
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java b/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java
index 801403a..879a4e6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java
@@ -17,42 +17,53 @@
 
 package org.apache.solr.cloud;
 
-import org.apache.solr.SolrTestCaseJ4;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
 import org.apache.solr.common.cloud.SolrZkClient;
 
-public class TestSizeLimitedDistributedMap extends SolrTestCaseJ4 {
+public class TestSizeLimitedDistributedMap extends TestDistributedMap {
 
   public void testCleanup() throws Exception {
-    String zkDir = createTempDir("TestSizeLimitedDistributedMap").toFile().getAbsolutePath();
-
-    ZkTestServer server = new ZkTestServer(zkDir);
-    try {
-      server.run();
-
-      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
-      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
-
-      try (SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), 10000)) {
-        DistributedMap map = Overseer.getCompletedMap(zkClient);
-        assertTrue(map instanceof SizeLimitedDistributedMap);
-        for (int i = 0; i < Overseer.NUM_RESPONSES_TO_STORE; i++) {
-          map.put("xyz_" + i, new byte[0]);
-        }
+    final List<String> deletedItems = new LinkedList<>();
+    final Set<String> expectedKeys = new HashSet<>();
+    int numResponsesToStore=TEST_NIGHTLY?Overseer.NUM_RESPONSES_TO_STORE:100;
+    
+    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+      String path = getAndMakeInitialPath(zkClient);
+      DistributedMap map = new SizeLimitedDistributedMap(zkClient, path, numResponsesToStore,
(element)->deletedItems.add(element));
+      for (int i = 0; i < numResponsesToStore; i++) {
+        map.put("xyz_" + i, new byte[0]);
+        expectedKeys.add("xyz_" + i);
+      }
 
-        assertEquals("Number of items do not match", Overseer.NUM_RESPONSES_TO_STORE, map.size());
-        // add another to trigger cleanup
-        map.put("xyz_10000", new byte[0]);
-        assertEquals("Distributed queue was not cleaned up",
-            Overseer.NUM_RESPONSES_TO_STORE - (Overseer.NUM_RESPONSES_TO_STORE / 10) + 1,
map.size());
-        for (int i = Overseer.NUM_RESPONSES_TO_STORE; i >= Overseer.NUM_RESPONSES_TO_STORE
/ 10; i--) {
-          assertTrue(map.contains("xyz_" + i));
-        }
-        for (int i = Overseer.NUM_RESPONSES_TO_STORE / 10 - 1; i >= 0; i--) {
-          assertFalse(map.contains("xyz_" + i));
-        }
+      assertEquals("Number of items do not match", numResponsesToStore, map.size());
+      assertTrue("Expected keys do not match", expectedKeys.containsAll(map.keys()));
+      assertTrue("Expected keys do not match", map.keys().containsAll(expectedKeys));
+      // add another to trigger cleanup
+      map.put("xyz_" + numResponsesToStore, new byte[0]);
+      expectedKeys.add("xyz_" + numResponsesToStore);
+      assertEquals("Distributed queue was not cleaned up",
+          numResponsesToStore - (numResponsesToStore / 10) + 1, map.size());
+      for (int i = numResponsesToStore; i >= numResponsesToStore / 10; i--) {
+        assertTrue(map.contains("xyz_" + i));
+      }
+      for (int i = numResponsesToStore / 10 - 1; i >= 0; i--) {
+        assertFalse(map.contains("xyz_" + i));
+        assertTrue(deletedItems.contains("xyz_" + i));
+        expectedKeys.remove("xyz_" + i);
       }
-    } finally {
-      server.shutdown();
+      assertTrue("Expected keys do not match", expectedKeys.containsAll(map.keys()));
+      assertTrue("Expected keys do not match", map.keys().containsAll(expectedKeys));
+      map.remove("xyz_" + numResponsesToStore);
+      assertFalse("map.remove shouldn't trigger the observer", 
+          deletedItems.contains("xyz_" + numResponsesToStore));
     }
   }
+  
+  protected DistributedMap createMap(SolrZkClient zkClient, String path) {
+    return new SizeLimitedDistributedMap(zkClient, path, Overseer.NUM_RESPONSES_TO_STORE,
null);
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
index eff0d8e..8c8b11a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
@@ -16,13 +16,22 @@
  */
 package org.apache.solr.cloud.api.collections;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
-
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.cloud.SolrCloudTestCase;
@@ -30,8 +39,12 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tests the Cloud Collections API.
@@ -40,6 +53,8 @@ import org.junit.Test;
 public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
 
   private static final int MAX_TIMEOUT_SECONDS = 60;
+  
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   @BeforeClass
   public static void setupCluster() throws Exception {
@@ -174,5 +189,74 @@ public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase
{
         .processAndWait(client, MAX_TIMEOUT_SECONDS);
     assertSame("DeleteCollection did not complete", RequestStatusState.COMPLETED, state);
   }
+  
+  public void testAsyncIdRaceCondition() throws Exception {
+    SolrClient[] clients = new SolrClient[cluster.getJettySolrRunners().size()];
+    int j = 0;
+    for (JettySolrRunner r:cluster.getJettySolrRunners()) {
+      clients[j++] = new HttpSolrClient.Builder(r.getBaseUrl().toString()).build();
+    }
+    RequestStatusState state = CollectionAdminRequest.createCollection("testAsyncIdRaceCondition","conf1",1,1)
+        .setRouterName("implicit")
+        .setShards("shard1")
+        .processAndWait(cluster.getSolrClient(), MAX_TIMEOUT_SECONDS);
+    assertSame("CreateCollection task did not complete!", RequestStatusState.COMPLETED, state);
+    
+    int numThreads = 10;
+    final AtomicInteger numSuccess = new AtomicInteger(0);
+    final AtomicInteger numFailure = new AtomicInteger(0);
+    final CountDownLatch latch = new CountDownLatch(numThreads);
+    
+    ExecutorService es = ExecutorUtil.newMDCAwareFixedThreadPool(numThreads, new DefaultSolrThreadFactory("testAsyncIdRaceCondition"));
+    try {
+      for (int i = 0; i < numThreads; i++) {
+        es.submit(new Runnable() {
+          
+          @Override
+          public void run() {
+            CollectionAdminRequest.Reload reloadCollectionRequest = CollectionAdminRequest.reloadCollection("testAsyncIdRaceCondition");
+            latch.countDown();
+            try {
+              latch.await();
+            } catch (InterruptedException e) {
+              throw new RuntimeException();
+            }
+            
+            try {
+              log.info("{} - Reloading Collection.", Thread.currentThread().getName());
+              reloadCollectionRequest.processAsync("repeatedId", clients[random().nextInt(clients.length)]);
+              numSuccess.incrementAndGet();
+            } catch (SolrServerException e) {
+              log.info(e.getMessage());
+              assertEquals("Task with the same requestid already exists.", e.getMessage());
+              numFailure.incrementAndGet();
+            } catch (IOException e) {
+              throw new RuntimeException();
+            }
+          }
+        });
+      }
+      es.shutdown();
+      assertTrue(es.awaitTermination(10, TimeUnit.SECONDS));
+      assertEquals(1, numSuccess.get());
+      assertEquals(numThreads - 1, numFailure.get());
+    } finally {
+      for (int i = 0; i < clients.length; i++) {
+        clients[i].close();
+      }
+    }
+  }
+  
+  public void testAsyncIdBackCompat() throws Exception {
+    //remove with Solr 9
+    cluster.getZkClient().makePath("/overseer/collection-map-completed/mn-testAsyncIdBackCompat",
true, true);
+    try {
+      CollectionAdminRequest.createCollection("testAsyncIdBackCompat","conf1",1,1)
+      .processAsync("testAsyncIdBackCompat", cluster.getSolrClient());
+      fail("Expecting exception");
+    } catch (SolrServerException e) {
+      assertTrue(e.getMessage().contains("Task with the same requestid already exists"));
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/61ea8f60/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index 75418c6..e432bb0 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -520,7 +520,6 @@ public class ZkTestServer {
     log.info("start zk server on port:" + port);
   }
 
-  @SuppressWarnings("deprecation")
   public void shutdown() throws IOException, InterruptedException {
     // TODO: this can log an exception while trying to unregister a JMX MBean
     zkServer.shutdown();


Mime
View raw message