lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [lucene-solr] branch master updated: SOLR-14749: Improve support for arbitrary container-level plugins. Add ClusterSingleton support for plugins that require only one active instance in the cluster.
Date Wed, 21 Oct 2020 17:33:15 GMT
This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 67ecd8f  SOLR-14749: Improve support for arbitrary container-level plugins. Add ClusterSingleton
support for plugins that require only one active instance in the cluster.
67ecd8f is described below

commit 67ecd8ff9ad3640016424ded86bfaaadd815b96d
Author: Andrzej Bialecki <ab@apache.org>
AuthorDate: Wed Oct 21 17:22:44 2020 +0200

    SOLR-14749: Improve support for arbitrary container-level plugins. Add ClusterSingleton
    support for plugins that require only one active instance in the cluster.
---
 solr/CHANGES.txt                                   |   3 +
 .../src/java/org/apache/solr/api/AnnotatedApi.java |  15 +-
 .../apache/solr/api/CustomContainerPlugins.java    |  66 ++++++-
 .../org/apache/solr/cloud/ClusterSingleton.java    |  73 ++++++++
 .../src/java/org/apache/solr/cloud/Overseer.java   |  14 ++
 .../org/apache/solr/core/ClusterSingletons.java    | 193 +++++++++++++++++++++
 .../java/org/apache/solr/core/CoreContainer.java   |  26 +++
 .../solr/handler/admin/ContainerPluginsApi.java    |  17 +-
 .../apache/solr/packagemanager/PackageManager.java |   3 +-
 .../test/org/apache/solr/cloud/OverseerTest.java   |  24 ++-
 .../apache/solr/handler/TestContainerPlugin.java   |  73 +++++++-
 11 files changed, 485 insertions(+), 22 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 9b43764..830b45a 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -16,6 +16,9 @@ New Features
 
 * SOLR-13528 Rate Limiting in Solr (Atri Sharma, Mike Drob)
 
+* SOLR-14749: Improve support for arbitrary container-level plugins. Add ClusterSingleton
+  support for plugins that require only one active instance in the cluster. (ab, noble)
+
 Improvements
 ----------------------
 * LUCENE-8984: MoreLikeThis MLT is biased for uncommon fields (Andy Hind via Anshum Gupta)
diff --git a/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java b/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
index fd77413..f9f97a4 100644
--- a/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
+++ b/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
@@ -85,9 +85,18 @@ public class AnnotatedApi extends Api implements PermissionNameProvider
, Closea
   }
 
   public static List<Api> getApis(Object obj) {
-    return getApis(obj.getClass(), obj);
+    return getApis(obj.getClass(), obj, true);
   }
-  public static List<Api> getApis(Class<? extends Object> theClass , Object obj)
 {
+
+  /**
+   * Get a list of Api-s supported by this class.
+   * @param theClass class
+   * @param obj object of this class (may be null)
+   * @param allowEmpty if false then an exception is thrown if no Api-s can be retrieved,
if true
+   *                then absence of Api-s is silently ignored.
+   * @return list of discovered Api-s
+   */
+  public static List<Api> getApis(Class<? extends Object> theClass , Object obj,
boolean allowEmpty)  {
     Class<?> klas = null;
     try {
       klas = MethodHandles.publicLookup().accessClass(theClass);
@@ -122,7 +131,7 @@ public class AnnotatedApi extends Api implements PermissionNameProvider
, Closea
         SpecProvider specProvider = readSpec(endPoint, Collections.singletonList(m));
         apis.add(new AnnotatedApi(specProvider, endPoint, Collections.singletonMap("", cmd),
null));
       }
-      if (apis.isEmpty()) {
+      if (!allowEmpty && apis.isEmpty()) {
         throw new RuntimeException("Invalid Class : " + klas.getName() + " No @EndPoints");
       }
 
diff --git a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
index 93de2e3..91b0452 100644
--- a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
+++ b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Supplier;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.lucene.util.ResourceLoaderAware;
@@ -54,12 +56,24 @@ import org.slf4j.LoggerFactory;
 import static org.apache.lucene.util.IOUtils.closeWhileHandlingException;
 import static org.apache.solr.common.util.Utils.makeMap;
 
+/**
+ * This class manages the container-level plugins and their Api-s. It is
+ * responsible for adding / removing / replacing the plugins according to the updated
+ * configuration obtained from {@link ContainerPluginsApi#plugins(Supplier)}.
+ * <p>Plugins instantiated by this class may implement zero or more {@link Api}-s,
which
+ * are then registered in the CoreContainer {@link ApiBag}. They may be also post-processed
+ * for additional functionality by {@link PluginRegistryListener}-s registered with
+ * this class.</p>
+ */
 public class CustomContainerPlugins implements ClusterPropertiesListener, MapWriter {
-  private final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  final CoreContainer coreContainer;
-  final ApiBag containerApiBag;
+  private final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
+
+  private final List<PluginRegistryListener> listeners = new CopyOnWriteArrayList<>();
+
+  private final CoreContainer coreContainer;
+  private final ApiBag containerApiBag;
 
   private final Map<String, ApiInfo> currentPlugins = new HashMap<>();
 
@@ -68,6 +82,14 @@ public class CustomContainerPlugins implements ClusterPropertiesListener,
MapWri
     refresh();
     return false;
   }
+
+  public void registerListener(PluginRegistryListener listener) {
+    listeners.add(listener);
+  }
+  public void unregisterListener(PluginRegistryListener listener) {
+    listeners.remove(listener);
+  }
+
   public CustomContainerPlugins(CoreContainer coreContainer, ApiBag apiBag) {
     this.coreContainer = coreContainer;
     this.containerApiBag = apiBag;
@@ -78,6 +100,10 @@ public class CustomContainerPlugins implements ClusterPropertiesListener,
MapWri
     currentPlugins.forEach(ew.getBiConsumer());
   }
 
+  public synchronized ApiInfo getPlugin(String name) {
+    return currentPlugins.get(name);
+  }
+
   public synchronized void refresh() {
     Map<String, Object> pluginInfos = null;
     try {
@@ -107,6 +133,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener,
MapWri
       if (e.getValue() == Diff.REMOVED) {
         ApiInfo apiInfo = currentPlugins.remove(e.getKey());
         if (apiInfo == null) continue;
+        listeners.forEach(listener -> listener.deleted(apiInfo));
         for (ApiHolder holder : apiInfo.holders) {
           Api old = containerApiBag.unregister(holder.api.getEndPoint().method()[0],
               getActualPath(apiInfo, holder.api.getEndPoint().path()[0]));
@@ -136,6 +163,8 @@ public class CustomContainerPlugins implements ClusterPropertiesListener,
MapWri
             containerApiBag.register(holder, getTemplateVars(apiInfo.info));
           }
           currentPlugins.put(e.getKey(), apiInfo);
+          final ApiInfo apiInfoFinal = apiInfo;
+          listeners.forEach(listener -> listener.added(apiInfoFinal));
         } else {
           //this plugin is being updated
           ApiInfo old = currentPlugins.put(e.getKey(), apiInfo);
@@ -143,6 +172,8 @@ public class CustomContainerPlugins implements ClusterPropertiesListener,
MapWri
             //register all new paths
             containerApiBag.register(holder, getTemplateVars(apiInfo.info));
           }
+          final ApiInfo apiInfoFinal = apiInfo;
+          listeners.forEach(listener -> listener.modified(old, apiInfoFinal));
           if (old != null) {
             //this is an update of the plugin. But, it is possible that
             // some paths are remved in the newer version of the plugin
@@ -201,6 +232,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener,
MapWri
   @SuppressWarnings({"rawtypes"})
   public class ApiInfo implements ReflectMapWriter {
     List<ApiHolder> holders;
+
     @JsonProperty
     private final PluginMeta info;
 
@@ -222,7 +254,13 @@ public class CustomContainerPlugins implements ClusterPropertiesListener,
MapWri
       return null;
     }
 
+    public Object getInstance() {
+      return instance;
+    }
 
+    public PluginMeta getInfo() {
+      return info.copy();
+    }
     @SuppressWarnings({"unchecked","rawtypes"})
     public ApiInfo(PluginMeta info, List<String> errs) {
       this.info = info;
@@ -268,14 +306,14 @@ public class CustomContainerPlugins implements ClusterPropertiesListener,
MapWri
       }
 
       try {
-        List<Api> apis = AnnotatedApi.getApis(klas, null);
+        List<Api> apis = AnnotatedApi.getApis(klas, null, true);
         for (Object api : apis) {
           EndPoint endPoint = ((AnnotatedApi) api).getEndPoint();
           if (endPoint.path().length > 1 || endPoint.method().length > 1) {
             errs.add("Only one HTTP method and url supported for each API");
           }
           if (endPoint.method().length != 1 || endPoint.path().length != 1) {
-            errs.add("The @EndPint must have exactly one method and path attributes");
+            errs.add("The @EndPoint must have exactly one method and path attributes");
           }
           List<String> pathSegments = StrUtils.splitSmart(endPoint.path()[0], '/',
true);
           PathTrie.replaceTemplates(pathSegments, getTemplateVars(info));
@@ -320,7 +358,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener,
MapWri
         }
       }
       this.holders = new ArrayList<>();
-      for (Api api : AnnotatedApi.getApis(instance)) {
+      for (Api api : AnnotatedApi.getApis(instance.getClass(), instance, true)) {
         holders.add(new ApiHolder((AnnotatedApi) api));
       }
     }
@@ -359,4 +397,20 @@ public class CustomContainerPlugins implements ClusterPropertiesListener,
MapWri
 
     return null;
   }
+
+  /**
+   * Listener for notifications about added / deleted / modified plugins.
+   */
+  public interface PluginRegistryListener {
+
+    /** Called when a new plugin is added. */
+    void added(ApiInfo plugin);
+
+    /** Called when an existing plugin is deleted. */
+    void deleted(ApiInfo plugin);
+
+    /** Called when an existing plugin is replaced. */
+    void modified(ApiInfo old, ApiInfo replacement);
+
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java b/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java
new file mode 100644
index 0000000..95357e2
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+/**
+ * Intended for components that should be enabled only one instance per cluster.
+ * <p>Components that implement this interface are always in one of these states:
+ * <ul>
+ *   <li>STOPPED - the default state. The component is idle and does not perform
+ *   any functions. It should also avoid holding any resources.</li>
+ *   <li>STARTING - transitional state, which leads either to RUNNING or STOPPING in
+ *   case of startup failures.</li>
+ *   <li>STOPPING - transitional state, which leads to STOPPED state.</li>
+ *   <li>RUNNING - the component is active.</li>
+ * </ul>
+ * <p>Components must be prepared to change these states multiple times in their
+ * life-cycle.</p>
+ * <p>Implementation detail: currently these components are instantiated on all nodes
+ * but they are started only on the Overseer leader, and stopped when the current
+ * node loses its Overseer leadership.</p>
+ */
+public interface ClusterSingleton {
+
+  enum State {
+    /** Component is idle. */
+    STOPPED,
+    /** Component is starting. */
+    STARTING,
+    /** Component is active. */
+    RUNNING,
+    /** Component is stopping. */
+    STOPPING
+  }
+
+  /**
+   * Unique name of this singleton. Used for registration.
+   */
+  String getName();
+
+  /**
+   * Start the operation of the component. Initially this method should set
+   * the state to STARTING, and on success it should set the state to RUNNING.
+   * @throws Exception on startup errors. The component should revert to the
+   * STOPPED state.
+   */
+  void start() throws Exception;
+
+  /**
+   * Returns the current state of the component.
+   */
+  State getState();
+
+  /**
+   * Stop the operation of the component. Initially this method should set
+   * the state to STOPPING, and on return it should set the state to STOPPED.
+   * Components should also avoid holding any resource when in STOPPED state.
+   */
+  void stop();
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 9f15bf1..fc0d0eb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -655,6 +655,8 @@ public class Overseer implements SolrCloseable {
       }
     });
 
+    getCoreContainer().getClusterSingletons().startClusterSingletons();
+
     assert ObjectReleaseTracker.track(this);
   }
 
@@ -774,6 +776,13 @@ public class Overseer implements SolrCloseable {
     }
   }
 
+  /**
+   * Start {@link ClusterSingleton} plugins when we become the leader.
+   */
+
+  /**
+   * Stop {@link ClusterSingleton} plugins when we lose leadership.
+   */
   public Stats getStats() {
     return stats;
   }
@@ -813,9 +822,14 @@ public class Overseer implements SolrCloseable {
     if (this.id != null) {
       log.info("Overseer (id={}) closing", id);
     }
+    // stop singletons only on the leader
+    if (!this.closed) {
+      getCoreContainer().getClusterSingletons().stopClusterSingletons();
+    }
     this.closed = true;
     doClose();
 
+
     assert ObjectReleaseTracker.release(this);
   }
 
diff --git a/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java b/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java
new file mode 100644
index 0000000..84acb4f
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.core;
+
+import org.apache.solr.api.CustomContainerPlugins;
+import org.apache.solr.cloud.ClusterSingleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * Helper class to manage the initial registration of {@link ClusterSingleton} plugins and
+ * to track the changes in loaded plugins in {@link org.apache.solr.api.CustomContainerPlugins}.
+ */
+public class ClusterSingletons {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
+  private final Supplier<Boolean> runSingletons;
+  private final Consumer<Runnable> asyncRunner;
+  private final CustomContainerPlugins.PluginRegistryListener pluginListener;
+
+  public static final int DEFAULT_WAIT_TIMEOUT_SEC = 60;
+
+  // we use this latch to delay the initial startup of singletons, due to
+  // the leader election occurring in parallel with the rest of the load() method.
+  private final CountDownLatch readyLatch = new CountDownLatch(1);
+
+  /**
+   * Create a helper to manage singletons.
+   * @param runSingletons this function returns true when singletons should be running. It's
+   *                      Used when adding or modifying existing plugins, and when invoking
+   *                      {@link #startClusterSingletons()}.
+   * @param asyncRunner async runner that will be used for starting up each singleton.
+   */
+  public ClusterSingletons(Supplier<Boolean> runSingletons, Consumer<Runnable>
asyncRunner) {
+    this.runSingletons = runSingletons;
+    this.asyncRunner = asyncRunner;
+    // create plugin registry listener
+    pluginListener = new CustomContainerPlugins.PluginRegistryListener() {
+      @Override
+      public void added(CustomContainerPlugins.ApiInfo plugin) {
+        if (plugin == null || plugin.getInstance() == null) {
+          return;
+        }
+        // register new api
+        Object instance = plugin.getInstance();
+        if (instance instanceof ClusterSingleton) {
+          ClusterSingleton singleton = (ClusterSingleton) instance;
+          singletonMap.put(singleton.getName(), singleton);
+          // check to see if we should immediately start this singleton
+          if (isReady() && runSingletons.get()) {
+            try {
+              singleton.start();
+            } catch (Exception exc) {
+              log.warn("Exception starting ClusterSingleton {}: {}", plugin, exc);
+            }
+          }
+        }
+      }
+
+      @Override
+      public void deleted(CustomContainerPlugins.ApiInfo plugin) {
+        if (plugin == null || plugin.getInstance() == null) {
+          return;
+        }
+        Object instance = plugin.getInstance();
+        if (instance instanceof ClusterSingleton) {
+          ClusterSingleton singleton = (ClusterSingleton) instance;
+          singleton.stop();
+          singletonMap.remove(singleton.getName());
+        }
+      }
+
+      @Override
+      public void modified(CustomContainerPlugins.ApiInfo old, CustomContainerPlugins.ApiInfo
replacement) {
+        added(replacement);
+        deleted(old);
+      }
+    };
+  }
+
+  public CustomContainerPlugins.PluginRegistryListener getPluginRegistryListener() {
+    return pluginListener;
+  }
+
+  /**
+   * Return modifiable registry of name / {@link ClusterSingleton}.
+   */
+  public Map<String, ClusterSingleton> getSingletons() {
+    return singletonMap;
+  }
+
+  /**
+   * Return true when this helper is ready to be used for singleton management.
+   */
+  public boolean isReady() {
+    return readyLatch.getCount() == 0;
+  }
+
+  /**
+   * Mark this helper as ready to be used for singleton management.
+   */
+  public void setReady() {
+    readyLatch.countDown();
+  }
+
+  /**
+   * Wait for this helper to become ready.
+   * @param timeout timeout value.
+   * @param timeUnit timeout unit.
+   * @throws InterruptedException on this thread being interrupted.
+   * @throws TimeoutException when specified timeout has elapsed but the helper is not ready.
+   */
+  public void waitUntilReady(long timeout, TimeUnit timeUnit)
+      throws InterruptedException, TimeoutException {
+    boolean await = readyLatch.await(timeout, timeUnit);
+    if (!await) {
+      throw new TimeoutException("Timed out waiting for ClusterSingletons to become ready.");
+    }
+  }
+
+  /**
+   * Start singletons when the helper is ready and when it's supposed to start
+   * (as determined by {@link #runSingletons} function). If the helper is not ready this
+   * method will use {@link #asyncRunner} to wait in another thread for
+   * {@link #DEFAULT_WAIT_TIMEOUT_SEC} seconds.
+   */
+  public void startClusterSingletons() {
+    final Runnable initializer = () -> {
+      if (!runSingletons.get()) {
+        return;
+      }
+      try {
+        waitUntilReady(DEFAULT_WAIT_TIMEOUT_SEC, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        log.warn("Interrupted initialization of ClusterSingleton-s");
+        return;
+      } catch (TimeoutException te) {
+        log.warn("Timed out during initialization of ClusterSingleton-s (waited {} sec)",
DEFAULT_WAIT_TIMEOUT_SEC);
+        return;
+      }
+      singletonMap.forEach((name, singleton) -> {
+        if (!runSingletons.get()) {
+          return;
+        }
+        try {
+          singleton.start();
+        } catch (Exception e) {
+          log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
+        }
+      });
+    };
+    if (!isReady()) {
+      // wait until all singleton-s are ready for the first startup
+      asyncRunner.accept(initializer);
+    } else {
+      initializer.run();
+    }
+  }
+
+  /**
+   * Stop all registered singletons.
+   */
+  public void stopClusterSingletons() {
+    singletonMap.forEach((name, singleton) -> {
+      singleton.stop();
+    });
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 9f3f17f..bdcd0b4 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -68,6 +68,7 @@ import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder.Credential
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ClusterSingleton;
 import org.apache.solr.cloud.OverseerTaskQueue;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.AlreadyClosedException;
@@ -245,6 +246,11 @@ public class CoreContainer {
 
   private final ObjectCache objectCache = new ObjectCache();
 
+  private final ClusterSingletons clusterSingletons = new ClusterSingletons(
+      () -> getZkController() != null &&
+          getZkController().getOverseer() != null &&
+          !getZkController().getOverseer().isClosed(),
+      (r) -> this.runAsync(r));
   private PackageStoreAPI packageStoreAPI;
   private PackageLoader packageLoader;
 
@@ -663,6 +669,8 @@ public class CoreContainer {
       loader.reloadLuceneSPI();
     }
 
+    customContainerPlugins.registerListener(clusterSingletons.getPluginRegistryListener());
+
     packageStoreAPI = new PackageStoreAPI(this);
     containerHandlers.getApiBag().registerObject(packageStoreAPI.readAPI);
     containerHandlers.getApiBag().registerObject(packageStoreAPI.writeAPI);
@@ -876,7 +884,21 @@ public class CoreContainer {
       ContainerPluginsApi containerPluginsApi = new ContainerPluginsApi(this);
       containerHandlers.getApiBag().registerObject(containerPluginsApi.readAPI);
       containerHandlers.getApiBag().registerObject(containerPluginsApi.editAPI);
+
+      // init ClusterSingleton-s
+
+      // register the handlers that are also ClusterSingleton
+      containerHandlers.keySet().forEach(handlerName -> {
+        SolrRequestHandler handler = containerHandlers.get(handlerName);
+        if (handler instanceof ClusterSingleton) {
+          ClusterSingleton singleton = (ClusterSingleton) handler;
+          clusterSingletons.getSingletons().put(singleton.getName(), singleton);
+        }
+      });
+
+      clusterSingletons.setReady();
       zkSys.getZkController().checkOverseerDesignate();
+
     }
     // This is a bit redundant but these are two distinct concepts for all they're accomplished
at the same time.
     status |= LOAD_COMPLETE | INITIAL_CORE_LOAD_COMPLETE;
@@ -2078,6 +2100,10 @@ public class CoreContainer {
     return customContainerPlugins;
   }
 
+  public ClusterSingletons getClusterSingletons() {
+    return clusterSingletons;
+  }
+
   static {
     ExecutorUtil.addThreadLocalProvider(SolrRequestInfo.getInheritableThreadLocalProvider());
   }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
index 0c7a487..f6af915 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
@@ -47,7 +47,9 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.lucene.util.IOUtils.closeWhileHandlingException;
 
-
+/**
+ * API to maintain container-level plugin configurations.
+ */
 public class ContainerPluginsApi {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -62,6 +64,9 @@ public class ContainerPluginsApi {
     this.coreContainer = coreContainer;
   }
 
+  /**
+   * API for reading the current plugin configurations.
+   */
   public class Read {
     @EndPoint(method = METHOD.GET,
         path = "/cluster/plugin",
@@ -71,6 +76,9 @@ public class ContainerPluginsApi {
     }
   }
 
+  /**
+   * API for editing the plugin configurations.
+   */
   @EndPoint(method = METHOD.POST,
       path = "/cluster/plugin",
       permission = PermissionNameProvider.Name.COLL_EDIT_PERM)
@@ -146,6 +154,13 @@ public class ContainerPluginsApi {
     }
   }
 
+  /**
+   * Retrieve the current plugin configurations.
+   * @param zkClientSupplier supplier of {@link SolrZkClient}
+   * @return current plugin configurations, where keys are plugin names and values
+   * are {@link PluginMeta} plugin metadata.
+   * @throws IOException on IO errors
+   */
   @SuppressWarnings("unchecked")
   public static Map<String, Object> plugins(Supplier<SolrZkClient> zkClientSupplier)
throws IOException {
     SolrZkClient zkClient = zkClientSupplier.get();
diff --git a/solr/core/src/java/org/apache/solr/packagemanager/PackageManager.java b/solr/core/src/java/org/apache/solr/packagemanager/PackageManager.java
index 424f604..6f2f618 100644
--- a/solr/core/src/java/org/apache/solr/packagemanager/PackageManager.java
+++ b/solr/core/src/java/org/apache/solr/packagemanager/PackageManager.java
@@ -52,6 +52,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.filestore.DistribPackageStore;
+import org.apache.solr.handler.admin.ContainerPluginsApi;
 import org.apache.solr.packagemanager.SolrPackage.Command;
 import org.apache.solr.packagemanager.SolrPackage.Manifest;
 import org.apache.solr.packagemanager.SolrPackage.Plugin;
@@ -231,7 +232,7 @@ public class PackageManager implements Closeable {
       }
     }
     @SuppressWarnings({"unchecked"})
-    Map<String, Object> clusterPlugins = (Map<String, Object>) result.getOrDefault("plugin",
Collections.emptyMap());
+    Map<String, Object> clusterPlugins = (Map<String, Object>) result.getOrDefault(ContainerPluginsApi.PLUGIN,
Collections.emptyMap());
     for (String key : clusterPlugins.keySet()) {
       // Map<String, String> pluginMeta = (Map<String, String>) clusterPlugins.get(key);
       PluginMeta pluginMeta;
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index b6372a0..89fda3e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -17,13 +17,7 @@
 package org.apache.solr.cloud;
 
 import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -73,6 +67,7 @@ import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CloudConfig;
+import org.apache.solr.core.ClusterSingletons;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrResourceLoader;
@@ -117,7 +112,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
   private static SolrZkClient zkClient;
 
-
   private volatile boolean testDone = false;
 
   private final List<ZkController> zkControllers = Collections.synchronizedList(new
ArrayList<>());
@@ -127,7 +121,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
   private final List<HttpShardHandlerFactory> httpShardHandlerFactorys = Collections.synchronizedList(new
ArrayList<>());
   private final List<UpdateShardHandler> updateShardHandlers = Collections.synchronizedList(new
ArrayList<>());
   private final List<CloudSolrClient> solrClients = Collections.synchronizedList(new
ArrayList<>());
-
   private static final String COLLECTION = SolrTestCaseJ4.DEFAULT_TEST_COLLECTION_NAME;
 
   public static class MockZKController{
@@ -306,6 +299,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
   @Before
   public void setUp() throws Exception {
     testDone = false;
+
     super.setUp();
   }
 
@@ -322,6 +316,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
     }
 
     server = null;
+
   }
 
   @After
@@ -1428,11 +1423,22 @@ public class OverseerTest extends SolrTestCaseJ4 {
         Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
     when(mockAlwaysUpCoreContainer.isShutDown()).thenReturn(testDone);  // Allow retry on
session expiry
     when(mockAlwaysUpCoreContainer.getResourceLoader()).thenReturn(new SolrResourceLoader());
+    ClusterSingletons singletons = new ClusterSingletons(() -> true, r -> r.run());
+    // don't wait for all singletons
+    singletons.setReady();
+    FieldSetter.setField(mockAlwaysUpCoreContainer, CoreContainer.class.getDeclaredField("clusterSingletons"),
singletons);
     FieldSetter.setField(zkController, ZkController.class.getDeclaredField("zkClient"), zkClient);
     FieldSetter.setField(zkController, ZkController.class.getDeclaredField("cc"), mockAlwaysUpCoreContainer);
     when(zkController.getCoreContainer()).thenReturn(mockAlwaysUpCoreContainer);
     when(zkController.getZkClient()).thenReturn(zkClient);
     when(zkController.getZkStateReader()).thenReturn(reader);
+    // primitive support for CC.runAsync
+    doAnswer(invocable -> {
+      Runnable r = invocable.getArgument(0);
+      Thread t = new Thread(r);
+      t.start();
+      return null;
+    }).when(mockAlwaysUpCoreContainer).runAsync(any(Runnable.class));
 
     when(zkController.getLeaderProps(anyString(), anyString(), anyInt())).thenCallRealMethod();
     when(zkController.getLeaderProps(anyString(), anyString(), anyInt(), anyBoolean())).thenCallRealMethod();
diff --git a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
index 4c37c17..6f7a18a 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
@@ -37,10 +37,12 @@ import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.client.solrj.request.beans.Package;
 import org.apache.solr.client.solrj.request.beans.PluginMeta;
 import org.apache.solr.client.solrj.response.V2Response;
+import org.apache.solr.cloud.ClusterSingleton;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.NavigableObject;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.filestore.PackageStoreAPI;
 import org.apache.solr.filestore.TestDistribPackageStore;
@@ -92,8 +94,11 @@ public class TestContainerPlugin extends SolrCloudTestCase {
       expectError(req, cluster.getSolrClient(), errPath, "No method with @Command in class");
 
       //test with an invalid class
-      plugin.klass = C1.class.getName();
-      expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
+      // XXX (ab) in order to support ClusterSingleton we allow adding
+      // plugins without Api EndPoints
+
+//      plugin.klass = C1.class.getName();
+//      expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
 
       //test with a valid class. This should succeed now
       plugin.klass = C3.class.getName();
@@ -170,6 +175,31 @@ public class TestContainerPlugin extends SolrCloudTestCase {
           .withMethod(GET)
           .build()
           .process(cluster.getSolrClient()));
+
+      // test ClusterSingleton plugin
+      plugin.name = "clusterSingleton";
+      plugin.klass = C6.class.getName();
+      req.process(cluster.getSolrClient());
+
+      //just check if the plugin is indeed registered
+      readPluginState = new V2Request.Builder("/cluster/plugin")
+          .forceV2(true)
+          .withMethod(GET)
+          .build();
+      rsp = readPluginState.process(cluster.getSolrClient());
+      assertEquals(C6.class.getName(), rsp._getStr("/plugin/clusterSingleton/class", null));
+
+      assertTrue("ccProvided", C6.ccProvided);
+      assertTrue("startCalled", C6.startCalled);
+      assertFalse("stopCalled", C6.stopCalled);
+      // kill the Overseer leader
+      for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+        if (!jetty.getCoreContainer().getZkController().getOverseer().isClosed()) {
+          cluster.stopJettySolrRunner(jetty);
+          cluster.waitForJettyToStop(jetty);
+        }
+      }
+      assertTrue("stopCalled", C6.stopCalled);
     } finally {
       cluster.shutdown();
     }
@@ -289,6 +319,45 @@ public class TestContainerPlugin extends SolrCloudTestCase {
     }
   }
 
+  public static class C6 implements ClusterSingleton {
+    static boolean startCalled = false;
+    static boolean stopCalled = false;
+    static boolean ccProvided = false;
+
+    private State state = State.STOPPED;
+
+    public C6(CoreContainer cc) {
+      if (cc != null) {
+        ccProvided = true;
+      }
+    }
+
+    @Override
+    public String getName() {
+      return "C6";
+    }
+
+    @Override
+    public void start() throws Exception {
+      state = State.STARTING;
+      startCalled = true;
+      state = State.RUNNING;
+    }
+
+    @Override
+    public State getState() {
+      return state;
+    }
+
+    @Override
+    public void stop() {
+      state = State.STOPPING;
+      stopCalled = true;
+      state = State.STOPPED;
+    }
+  }
+
+
   public static class C5 implements ResourceLoaderAware {
     static ByteBuffer classData;
     private  SolrResourceLoader resourceLoader;


Mime
View raw message