This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14749-api
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 42adade79bde58e2b8ab1e3cb6d5f2990fb5e611
Author: Andrzej Bialecki <ab@apache.org>
AuthorDate: Thu Oct 8 12:23:53 2020 +0200
SOLR-14749: Keep only the API part here, remove all implementation details.
---
.../org/apache/solr/cloud/ClusterSingleton.java | 31 ++-
.../src/java/org/apache/solr/cloud/Overseer.java | 63 ------
.../apache/solr/cluster/events/ClusterEvent.java | 10 +-
.../solr/cluster/events/ClusterEventListener.java | 4 +-
.../solr/cluster/events/ClusterEventProducer.java | 75 ++-----
.../events/ClusterPropertiesChangedEvent.java | 6 -
.../solr/cluster/events/CollectionsAddedEvent.java | 6 -
.../cluster/events/CollectionsRemovedEvent.java | 7 -
.../apache/solr/cluster/events/NodesDownEvent.java | 7 -
.../apache/solr/cluster/events/NodesUpEvent.java | 7 -
.../solr/cluster/events/ReplicasDownEvent.java | 7 -
.../events/impl/ClusterEventProducerImpl.java | 241 ---------------------
.../impl/CollectionsRepairEventListener.java | 185 ----------------
.../solr/cluster/events/impl/package-info.java | 23 --
.../java/org/apache/solr/core/CoreContainer.java | 109 ----------
15 files changed, 42 insertions(+), 739 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java b/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java
index 1ae1eed..70b02fb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java
@@ -17,12 +17,12 @@
package org.apache.solr.cloud;
/**
- * Intended for {@link org.apache.solr.core.CoreContainer} plugins that should be
- * enabled only one instance per cluster.
- * <p>Components that implement this interface are always in one of two states:
+ * Intended for plugins that should be enabled only one instance per cluster.
+ * <p>Components that implement this interface are always in one of these states:
* <ul>
* <li>STOPPED - the default state. The component is idle and does not perform
* any functions. It should also avoid holding any resources.</li>
+ * <li>STARTING - </li>
* <li>RUNNING - the component is active.</li>
* </ul>
* <p>Components must be prepared to change these states multiple times in their
@@ -33,28 +33,39 @@ package org.apache.solr.cloud;
*/
public interface ClusterSingleton {
+ enum State {
+ /** Component is idle. */
+ STOPPED,
+ /** Component is starting. */
+ STARTING,
+ /** Component is active. */
+ RUNNING,
+ /** Component is stopping. */
+ STOPPING
+ }
+
/**
* Unique name of this singleton. Used for registration.
*/
String getName();
/**
- * Start the operation of the component. On return the component is assumed
- * to be in the RUNNING state.
+ * Start the operation of the component. Initially this method should set
+ * the state to STARTING, and on success it should set the state to RUNNING.
* @throws Exception on startup errors. The component should revert to the
* STOPPED state.
*/
void start() throws Exception;
/**
- * Returns true if the component is in the RUNNING state, false otherwise.
+ * Returns the current state of the component.
*/
- boolean isRunning();
+ State getState();
/**
- * Stop the operation of the component. On return the component is assumed
- * to be in the STOPPED state. Components should also avoid holding any resources
- * in the STOPPED state.
+ * Stop the operation of the component. Initially this method should set
+ * the state to STOPPING, and on return it should set the state to STOPPED.
+ * Components should also avoid holding any resource when in STOPPED state.
*/
void stop();
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 2465f8a..bb405ad 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,8 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.apache.lucene.util.Version;
@@ -49,7 +47,6 @@ import org.apache.solr.cloud.overseer.ReplicaMutator;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
-import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
@@ -659,8 +656,6 @@ public class Overseer implements SolrCloseable {
}
});
- startClusterSingletons();
-
assert ObjectReleaseTracker.track(this);
}
@@ -780,59 +775,6 @@ public class Overseer implements SolrCloseable {
}
}
- /**
- * Start {@link ClusterSingleton} plugins when we become the leader.
- */
- public void startClusterSingletons() {
- CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
- final Runnable initializer = () -> {
- if (isClosed()) {
- return;
- }
- try {
- singletons.waitUntilReady(60, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Interrupted initialization of ClusterSingleton-s");
- return;
- } catch (TimeoutException te) {
- log.warn("Timed out during initialization of ClusterSingleton-s");
- return;
- }
- singletons.getSingletons().forEach((name, singleton) -> {
- try {
- singleton.start();
- if (singleton instanceof ClusterEventListener) {
- getCoreContainer().getClusterEventProducer().registerListener((ClusterEventListener)
singleton);
- }
- } catch (Exception e) {
- log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
- }
- });
- };
- if (singletons.isReady()) {
- // wait until all singleton-s are ready for the first startup
- getCoreContainer().runAsync(initializer);
- } else {
- initializer.run();
- }
- }
-
- /**
- * Stop {@link ClusterSingleton} plugins when we lose leadership.
- */
- private void stopClusterSingletons() {
- CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
- if (singletons == null) {
- return;
- }
- singletons.getSingletons().forEach((name, singleton) -> {
- if (singleton instanceof ClusterEventListener) {
- getCoreContainer().getClusterEventProducer().unregisterListener((ClusterEventListener)
singleton);
- }
- singleton.stop();
- });
- }
-
public Stats getStats() {
return stats;
}
@@ -872,14 +814,9 @@ public class Overseer implements SolrCloseable {
if (this.id != null) {
log.info("Overseer (id={}) closing", id);
}
- // stop singletons only on the leader
- if (!this.closed) {
- stopClusterSingletons();
- }
this.closed = true;
doClose();
-
assert ObjectReleaseTracker.release(this);
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
index 2dc7a32..1929f0c 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
@@ -16,15 +16,12 @@
*/
package org.apache.solr.cluster.events;
-import org.apache.solr.common.MapWriter;
-
-import java.io.IOException;
import java.time.Instant;
/**
* Cluster-level event.
*/
-public interface ClusterEvent extends MapWriter {
+public interface ClusterEvent {
enum EventType {
/** One or more nodes went down. */
@@ -49,9 +46,4 @@ public interface ClusterEvent extends MapWriter {
/** Get event timestamp. This is the instant when the event was generated (not necessarily
when
* the underlying condition first occurred). */
Instant getTimestamp();
-
- default void writeMap(EntryWriter ew) throws IOException {
- ew.put("type", getType());
- ew.put("timestamp", getTimestamp().toEpochMilli());
- }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
index 592f118..6f84457 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
@@ -16,15 +16,13 @@
*/
package org.apache.solr.cluster.events;
-import org.apache.solr.cloud.ClusterSingleton;
-
/**
* Components that want to be notified of cluster-wide events should use this.
*
* XXX should this work only for ClusterSingleton-s? some types of events may be
* XXX difficult (or pointless) to propagate to every node.
*/
-public interface ClusterEventListener extends ClusterSingleton {
+public interface ClusterEventListener {
/**
* Handle the event. Implementations should be non-blocking - if any long
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
index 1c2327b..09c3142 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
@@ -16,30 +16,10 @@
*/
package org.apache.solr.cluster.events;
-import org.apache.solr.cloud.ClusterSingleton;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* Component that produces {@link ClusterEvent} instances.
*/
-public interface ClusterEventProducer extends ClusterSingleton {
-
- String PLUGIN_NAME = "clusterEventProducer";
-
- default String getName() {
- return PLUGIN_NAME;
- }
-
- /**
- * Returns a modifiable map of event types and listeners to process events
- * of a given type.
- */
- Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners();
+public interface ClusterEventProducer {
/**
* Register an event listener for processing the specified event types.
@@ -48,27 +28,13 @@ public interface ClusterEventProducer extends ClusterSingleton {
* @param eventTypes non-empty array of event types that this listener
* is being registered for. If this is null or empty then all types will
be used.
*/
- default void registerListener(ClusterEventListener listener, ClusterEvent.EventType...
eventTypes) throws Exception {
- Objects.requireNonNull(listener);
- if (eventTypes == null || eventTypes.length == 0) {
- eventTypes = ClusterEvent.EventType.values();
- }
- for (ClusterEvent.EventType type : eventTypes) {
- Set<ClusterEventListener> perType = getEventListeners().computeIfAbsent(type,
t -> ConcurrentHashMap.newKeySet());
- perType.add(listener);
- }
- }
+ void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes)
throws Exception;
/**
- * Unregister an event listener.
+ * Unregister an event listener for all event types.
* @param listener non-null listener.
*/
- default void unregisterListener(ClusterEventListener listener) {
- Objects.requireNonNull(listener);
- getEventListeners().forEach((type, listeners) -> {
- listeners.remove(listener);
- });
- }
+ void unregisterListener(ClusterEventListener listener);
/**
* Unregister an event listener for specified event types.
@@ -76,25 +42,22 @@ public interface ClusterEventProducer extends ClusterSingleton {
* @param eventTypes event types from which the listener will be unregistered. If this
* is null or empty then all event types will be used
*/
- default void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType...
eventTypes) {
- Objects.requireNonNull(listener);
- if (eventTypes == null || eventTypes.length == 0) {
- eventTypes = ClusterEvent.EventType.values();
+ void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes);
+
+ static ClusterEventProducer NO_OP_PRODUCER = new ClusterEventProducer() {
+ @Override
+ public void registerListener(ClusterEventListener listener, ClusterEvent.EventType...
eventTypes) throws Exception {
+
}
- for (ClusterEvent.EventType type : eventTypes) {
- getEventListeners()
- .getOrDefault(type, Collections.emptySet())
- .remove(listener);
+
+ @Override
+ public void unregisterListener(ClusterEventListener listener) {
+
}
- }
- /**
- * Fire an event. This method will call registered listeners that subscribed to the
- * type of event being passed.
- * @param event cluster event
- */
- default void fireEvent(ClusterEvent event) {
- getEventListeners().getOrDefault(event.getType(), Collections.emptySet())
- .forEach(listener -> listener.onEvent(event));
- }
+ @Override
+ public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType...
eventTypes) {
+
+ }
+ };
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
b/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
index ad9c0b8..ee513d8 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
@@ -16,7 +16,6 @@
*/
package org.apache.solr.cluster.events;
-import java.io.IOException;
import java.util.Map;
/**
@@ -31,9 +30,4 @@ public interface ClusterPropertiesChangedEvent extends ClusterEvent {
Map<String, Object> getNewClusterProperties();
- @Override
- default void writeMap(EntryWriter ew) throws IOException {
- ClusterEvent.super.writeMap(ew);
- ew.put("newClusterProperties", getNewClusterProperties());
- }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
index 78046f8..8614769 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
@@ -16,7 +16,6 @@
*/
package org.apache.solr.cluster.events;
-import java.io.IOException;
import java.util.Iterator;
/**
@@ -31,9 +30,4 @@ public interface CollectionsAddedEvent extends ClusterEvent {
Iterator<String> getCollectionNames();
- @Override
- default void writeMap(EntryWriter ew) throws IOException {
- ClusterEvent.super.writeMap(ew);
- ew.put("collectionNames", getCollectionNames());
- }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
index a93be4c..e6fc64e 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
@@ -16,7 +16,6 @@
*/
package org.apache.solr.cluster.events;
-import java.io.IOException;
import java.util.Iterator;
/**
@@ -30,10 +29,4 @@ public interface CollectionsRemovedEvent extends ClusterEvent {
}
Iterator<String> getCollectionNames();
-
- @Override
- default void writeMap(EntryWriter ew) throws IOException {
- ClusterEvent.super.writeMap(ew);
- ew.put("collectionNames", getCollectionNames());
- }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
index 5001ccb..a8e7a2e 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
@@ -16,7 +16,6 @@
*/
package org.apache.solr.cluster.events;
-import java.io.IOException;
import java.util.Iterator;
/**
@@ -30,10 +29,4 @@ public interface NodesDownEvent extends ClusterEvent {
}
Iterator<String> getNodeNames();
-
- @Override
- default void writeMap(EntryWriter ew) throws IOException {
- ClusterEvent.super.writeMap(ew);
- ew.put("nodeNames", getNodeNames());
- }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
index fa08f85..f83bf91 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
@@ -16,7 +16,6 @@
*/
package org.apache.solr.cluster.events;
-import java.io.IOException;
import java.util.Iterator;
/**
@@ -30,10 +29,4 @@ public interface NodesUpEvent extends ClusterEvent {
}
Iterator<String> getNodeNames();
-
- @Override
- default void writeMap(EntryWriter ew) throws IOException {
- ClusterEvent.super.writeMap(ew);
- ew.put("nodeNames", getNodeNames());
- }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
index 1d3ce9b..69ec48c 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
@@ -18,7 +18,6 @@ package org.apache.solr.cluster.events;
import org.apache.solr.common.cloud.Replica;
-import java.io.IOException;
import java.util.Iterator;
/**
@@ -32,10 +31,4 @@ public interface ReplicasDownEvent extends ClusterEvent {
}
Iterator<Replica> getReplicas();
-
- @Override
- default void writeMap(EntryWriter ew) throws IOException {
- ClusterEvent.super.writeMap(ew);
- ew.put("replicas", getReplicas());
- }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
deleted file mode 100644
index 034fa8a..0000000
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cluster.events.impl;
-
-import java.lang.invoke.MethodHandles;
-import java.time.Instant;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.cluster.events.ClusterPropertiesChangedEvent;
-import org.apache.solr.cluster.events.ClusterEvent;
-import org.apache.solr.cluster.events.ClusterEventListener;
-import org.apache.solr.cluster.events.ClusterEventProducer;
-import org.apache.solr.cloud.ClusterSingleton;
-import org.apache.solr.cluster.events.CollectionsAddedEvent;
-import org.apache.solr.cluster.events.CollectionsRemovedEvent;
-import org.apache.solr.cluster.events.NodesDownEvent;
-import org.apache.solr.cluster.events.NodesUpEvent;
-import org.apache.solr.common.cloud.CloudCollectionsListener;
-import org.apache.solr.common.cloud.ClusterPropertiesListener;
-import org.apache.solr.common.cloud.LiveNodesListener;
-import org.apache.solr.core.CoreContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of {@link ClusterEventProducer}.
- * <h3>Implementation notes</h3>
- * <p>For each cluster event relevant listeners are always invoked sequentially
- * (not in parallel) and in arbitrary order. This means that if any listener blocks the
- * processing other listeners may be invoked much later or not at all.</p>
- */
-public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSingleton {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners
= new HashMap<>();
- private CoreContainer coreContainer;
- private LiveNodesListener liveNodesListener;
- private CloudCollectionsListener cloudCollectionsListener;
- private ClusterPropertiesListener clusterPropertiesListener;
- private ZkController zkController;
- private volatile boolean running;
-
- private final Set<ClusterEvent.EventType> supportedEvents =
- new HashSet<>(Arrays.asList(
- ClusterEvent.EventType.NODES_DOWN,
- ClusterEvent.EventType.NODES_UP,
- ClusterEvent.EventType.COLLECTIONS_ADDED,
- ClusterEvent.EventType.COLLECTIONS_REMOVED,
- ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED
- ));
-
- public ClusterEventProducerImpl(CoreContainer coreContainer) {
- this.coreContainer = coreContainer;
- }
-
- // ClusterSingleton lifecycle methods
- @Override
- public void start() {
- if (coreContainer == null) {
- liveNodesListener = null;
- cloudCollectionsListener = null;
- clusterPropertiesListener = null;
- return;
- }
- this.zkController = this.coreContainer.getZkController();
-
- // clean up any previous instances
- doStop();
-
- // register liveNodesListener
- liveNodesListener = (oldNodes, newNodes) -> {
- // already closed but still registered
- if (!running) {
- // remove the listener
- return true;
- }
- // spurious event, ignore but keep listening
- if (oldNodes.equals(newNodes)) {
- return false;
- }
- final Instant now = Instant.now();
- final Set<String> downNodes = new HashSet<>(oldNodes);
- downNodes.removeAll(newNodes);
- if (!downNodes.isEmpty()) {
- fireEvent(new NodesDownEvent() {
- @Override
- public Iterator<String> getNodeNames() {
- return downNodes.iterator();
- }
-
- @Override
- public Instant getTimestamp() {
- return now;
- }
- });
- }
- final Set<String> upNodes = new HashSet<>(newNodes);
- upNodes.removeAll(oldNodes);
- if (!upNodes.isEmpty()) {
- fireEvent(new NodesUpEvent() {
- @Override
- public Iterator<String> getNodeNames() {
- return upNodes.iterator();
- }
-
- @Override
- public Instant getTimestamp() {
- return now;
- }
- });
- }
- return false;
- };
- zkController.zkStateReader.registerLiveNodesListener(liveNodesListener);
-
- cloudCollectionsListener = ((oldCollections, newCollections) -> {
- if (oldCollections.equals(newCollections)) {
- return;
- }
- final Instant now = Instant.now();
- final Set<String> removed = new HashSet<>(oldCollections);
- removed.removeAll(newCollections);
- if (!removed.isEmpty()) {
- fireEvent(new CollectionsRemovedEvent() {
- @Override
- public Iterator<String> getCollectionNames() {
- return removed.iterator();
- }
-
- @Override
- public Instant getTimestamp() {
- return now;
- }
- });
- }
- final Set<String> added = new HashSet<>(newCollections);
- added.removeAll(oldCollections);
- if (!added.isEmpty()) {
- fireEvent(new CollectionsAddedEvent() {
- @Override
- public Iterator<String> getCollectionNames() {
- return added.iterator();
- }
-
- @Override
- public Instant getTimestamp() {
- return now;
- }
- });
- }
- });
- zkController.zkStateReader.registerCloudCollectionsListener(cloudCollectionsListener);
-
- clusterPropertiesListener = (newProperties) -> {
- fireEvent(new ClusterPropertiesChangedEvent() {
- final Instant now = Instant.now();
- @Override
- public Map<String, Object> getNewClusterProperties() {
- return newProperties;
- }
-
- @Override
- public Instant getTimestamp() {
- return now;
- }
- });
- return false;
- };
- zkController.zkStateReader.registerClusterPropertiesListener(clusterPropertiesListener);
-
- // XXX register collection state listener?
- // XXX not sure how to efficiently monitor for REPLICA_DOWN events
-
- running = true;
- }
-
- @Override
- public boolean isRunning() {
- return running;
- }
-
- @Override
- public void stop() {
- doStop();
- running = false;
- }
-
- private void doStop() {
- if (liveNodesListener != null) {
- zkController.zkStateReader.removeLiveNodesListener(liveNodesListener);
- }
- if (cloudCollectionsListener != null) {
- zkController.zkStateReader.removeCloudCollectionsListener(cloudCollectionsListener);
- }
- if (clusterPropertiesListener != null) {
- zkController.zkStateReader.removeClusterPropertiesListener(clusterPropertiesListener);
- }
- liveNodesListener = null;
- cloudCollectionsListener = null;
- clusterPropertiesListener = null;
- }
-
- @Override
- public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes)
throws Exception {
- try {
- for (ClusterEvent.EventType type : eventTypes) {
- if (!supportedEvents.contains(type)) {
- log.warn("event type {} not supported yet.", type);
- }
- }
- } catch (Throwable e) {
- throw new Exception(e);
- }
- ClusterEventProducer.super.registerListener(listener, eventTypes);
- }
-
- @Override
- public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners()
{
- return listeners;
- }
-}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
deleted file mode 100644
index 42dcde3..0000000
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cluster.events.impl;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.cloud.api.collections.Assign;
-import org.apache.solr.cluster.events.ClusterEvent;
-import org.apache.solr.cluster.events.ClusterEventListener;
-import org.apache.solr.cluster.events.NodesDownEvent;
-import org.apache.solr.cluster.events.ReplicasDownEvent;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ReplicaPosition;
-import org.apache.solr.core.CoreContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is an illustration how to re-implement the combination of 8x
- * NodeLostTrigger and AutoAddReplicasPlanAction to maintain the collection's replication
factor.
- * <p>NOTE: there's no support for 'waitFor' yet.</p>
- * <p>NOTE 2: this functionality would be probably more reliable when executed also
as a
- * periodically scheduled check - both as a reactive (listener) and proactive (scheduled)
measure.</p>
- */
-public class CollectionsRepairEventListener implements ClusterEventListener {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public static final String PLUGIN_NAME = "collectionsRepairListener";
- private static final String ASYNC_ID_PREFIX = "_async_" + PLUGIN_NAME;
- private static final AtomicInteger counter = new AtomicInteger();
-
- private final SolrClient solrClient;
- private final SolrCloudManager solrCloudManager;
-
- private volatile boolean running = false;
-
- public CollectionsRepairEventListener(CoreContainer cc) {
- this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
- this.solrCloudManager = cc.getZkController().getSolrCloudManager();
- }
-
- @Override
- public String getName() {
- return PLUGIN_NAME;
- }
-
- @Override
- public void onEvent(ClusterEvent event) {
- if (!isRunning()) {
- // ignore the event
- return;
- }
- switch (event.getType()) {
- case NODES_DOWN:
- handleNodesDown((NodesDownEvent) event);
- break;
- case REPLICAS_DOWN:
- handleReplicasDown((ReplicasDownEvent) event);
- break;
- default:
- log.warn("Unsupported event {}, ignoring...", event);
- }
- }
-
- private void handleNodesDown(NodesDownEvent event) {
- // collect all lost replicas
- // collection / positions
- Map<String, List<ReplicaPosition>> newPositions = new HashMap<>();
- try {
- ClusterState clusterState = solrCloudManager.getClusterStateProvider().getClusterState();
- Set<String> lostNodeNames = new HashSet<>();
- event.getNodeNames().forEachRemaining(lostNodeNames::add);
- clusterState.forEachCollection(coll -> {
- // shard / type / count
- Map<String, Map<Replica.Type, AtomicInteger>> lostReplicas = new HashMap<>();
- coll.forEachReplica((shard, replica) -> {
- if (lostNodeNames.contains(replica.getNodeName())) {
- lostReplicas.computeIfAbsent(shard, s -> new HashMap<>())
- .computeIfAbsent(replica.type, t -> new AtomicInteger())
- .incrementAndGet();
- }
- });
- Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(solrCloudManager,
clusterState, coll);
- lostReplicas.forEach((shard, types) -> {
- Assign.AssignRequestBuilder assignRequestBuilder = new Assign.AssignRequestBuilder()
- .forCollection(coll.getName())
- .forShard(Collections.singletonList(shard));
- types.forEach((type, count) -> {
- switch (type) {
- case NRT:
- assignRequestBuilder.assignNrtReplicas(count.get());
- break;
- case PULL:
- assignRequestBuilder.assignPullReplicas(count.get());
- break;
- case TLOG:
- assignRequestBuilder.assignTlogReplicas(count.get());
- break;
- }
- });
- Assign.AssignRequest assignRequest = assignRequestBuilder.build();
- try {
- List<ReplicaPosition> positions = assignStrategy.assign(solrCloudManager,
assignRequest);
- newPositions.put(coll.getName(), positions);
- } catch (Exception e) {
- log.warn("Exception computing positions for {}/{}: {}", coll.getName(), shard,
e);
- return;
- }
- });
- });
- } catch (IOException e) {
- log.warn("Exception getting cluster state", e);
- return;
- }
-
- // send ADDREPLICA admin requests for each lost replica
- // XXX should we use 'async' for that, to avoid blocking here?
- List<CollectionAdminRequest.AddReplica> addReplicas = new ArrayList<>();
- newPositions.forEach((collection, positions) -> {
- positions.forEach(position -> {
- CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest
- .addReplicaToShard(collection, position.shard, position.type);
- addReplica.setNode(position.node);
- addReplica.setAsyncId(ASYNC_ID_PREFIX + counter.incrementAndGet());
- addReplicas.add(addReplica);
- });
- });
- addReplicas.forEach(addReplica -> {
- try {
- solrClient.request(addReplica);
- } catch (Exception e) {
- log.warn("Exception calling ADDREPLICA {}: {}", addReplica.getParams().toQueryString(),
e);
- }
- });
-
- // ... and DELETERPLICA for lost ones?
- }
-
- private void handleReplicasDown(ReplicasDownEvent event) {
- // compute new placements for all replicas that went down
- // send ADDREPLICA admin request for each lost replica
- }
-
- @Override
- public void start() throws Exception {
- running = true;
- }
-
- @Override
- public boolean isRunning() {
- return running;
- }
-
- @Override
- public void stop() {
- running = false;
- }
-}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/package-info.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/package-info.java
deleted file mode 100644
index 2c115b6..0000000
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Default implementation of {@link org.apache.solr.cluster.events.ClusterEventProducer}.
- */
-package org.apache.solr.cluster.events.impl;
-
-
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 1427dad..bcc2039 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -39,12 +39,9 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
@@ -71,13 +68,8 @@ import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder.Credential
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
import org.apache.solr.cloud.CloudDescriptor;
-import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.ZkController;
-import org.apache.solr.cluster.events.ClusterEvent;
-import org.apache.solr.cluster.events.ClusterEventListener;
-import org.apache.solr.cluster.events.ClusterEventProducer;
-import org.apache.solr.cluster.events.impl.ClusterEventProducerImpl;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -176,72 +168,6 @@ public class CoreContainer {
}
}
- public static class ClusterSingletons {
- private Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
- // we use this latch to delay the initial startup of singletons, due to
- // the leader election occurring in parallel with the rest of the load() method.
- private CountDownLatch readyLatch = new CountDownLatch(1);
-
- public Map<String, ClusterSingleton> getSingletons() {
- return singletonMap;
- }
-
- public boolean isReady() {
- return readyLatch.getCount() > 0;
- }
-
- public void setReady() {
- readyLatch.countDown();
- }
-
- public void waitUntilReady(long timeout, TimeUnit timeUnit)
- throws InterruptedException, TimeoutException {
- boolean await = readyLatch.await(timeout, timeUnit);
- if (!await) {
- throw new TimeoutException("Timed out waiting for ClusterSingletons to become ready.");
- }
- }
- }
-
- /**
- * This class helps in handling the initial registration of plugin-based listeners,
- * when both the final {@link ClusterEventProducer} implementation and listeners
- * are configured using plugins.
- */
- public static class InitialClusterEventProducer implements ClusterEventProducer {
- Map<ClusterEvent.EventType, Set<ClusterEventListener>> initialListeners =
new HashMap<>();
-
- @Override
- public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners()
{
- return initialListeners;
- }
-
- public void transferListeners(ClusterEventProducer target) {
- initialListeners.forEach((type, listeners) -> {
- listeners.forEach(listener -> {
- try {
- target.registerListener(listener, type);
- } catch (Exception e) {
- log.warn("Unable to register event listener for type {}: {}", type, e);
- }
- });
- });
- }
-
- @Override
- public void start() throws Exception {
- }
-
- @Override
- public boolean isRunning() {
- return false;
- }
-
- @Override
- public void stop() {
- }
- }
-
private volatile PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class,
null);
/**
@@ -317,11 +243,8 @@ public class CoreContainer {
private volatile SolrClientCache solrClientCache;
- private volatile ClusterEventProducer clusterEventProducer = new InitialClusterEventProducer();
-
private final ObjectCache objectCache = new ObjectCache();
- private final ClusterSingletons clusterSingletons = new ClusterSingletons();
private PackageStoreAPI packageStoreAPI;
private PackageLoader packageLoader;
@@ -969,31 +892,7 @@ public class CoreContainer {
ContainerPluginsApi containerPluginsApi = new ContainerPluginsApi(this);
containerHandlers.getApiBag().registerObject(containerPluginsApi.readAPI);
containerHandlers.getApiBag().registerObject(containerPluginsApi.editAPI);
-
- // init ClusterSingleton-s
-
- // register the handlers that are also ClusterSingleton
- containerHandlers.keySet().forEach(handlerName -> {
- SolrRequestHandler handler = containerHandlers.get(handlerName);
- if (handler instanceof ClusterSingleton) {
- clusterSingletons.singletonMap.put(handlerName, (ClusterSingleton) handler);
- }
- });
- // create the ClusterEventProducer
- InitialClusterEventProducer initialClusterEventProducer = (InitialClusterEventProducer)
clusterEventProducer;
- CustomContainerPlugins.ApiInfo clusterEventProducerInfo = customContainerPlugins.getPlugin(ClusterEventProducer.PLUGIN_NAME);
- if (clusterEventProducerInfo != null) {
- clusterEventProducer = (ClusterEventProducer) clusterEventProducerInfo.getInstance();
- } else {
- clusterEventProducer = new ClusterEventProducerImpl(this);
- clusterSingletons.singletonMap.put(ClusterEventProducer.PLUGIN_NAME, clusterEventProducer);
- }
- // transfer those listeners that were already registered to the initial impl
- initialClusterEventProducer.transferListeners(clusterEventProducer);
-
- clusterSingletons.setReady();
zkSys.getZkController().checkOverseerDesignate();
-
}
// This is a bit redundant but these are two distinct concepts for all they're accomplished
at the same time.
status |= LOAD_COMPLETE | INITIAL_CORE_LOAD_COMPLETE;
@@ -2191,14 +2090,6 @@ public class CoreContainer {
return customContainerPlugins;
}
- public ClusterSingletons getClusterSingletons() {
- return clusterSingletons;
- }
-
- public ClusterEventProducer getClusterEventProducer() {
- return clusterEventProducer;
- }
-
static {
ExecutorUtil.addThreadLocalProvider(SolrRequestInfo.getInheritableThreadLocalProvider());
}
|