lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject [08/19] lucene-solr:branch_7x: Merge branch 'feature/autoscaling' of https://git-wip-us.apache.org/repos/asf/lucene-solr
Date Mon, 09 Oct 2017 15:31:00 GMT
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccf1bf72/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
new file mode 100644
index 0000000..bbfd098
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.TimeSource;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Trigger for the {@link TriggerEventType#NODEADDED} event
+ */
+public class NodeAddedTrigger extends TriggerBase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final String name;
+  private final Map<String, Object> properties;
+  private final CoreContainer container;
+  private final ZkController zkController;
+  private final List<TriggerAction> actions;
+  private final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef;
+  private final boolean enabled;
+  private final int waitForSecond;
+  private final TriggerEventType eventType;
+  private final TimeSource timeSource;
+
+  private boolean isClosed = false;
+
+  private Set<String> lastLiveNodes;
+
+  private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
+
+  public NodeAddedTrigger(String name, Map<String, Object> properties,
+                          CoreContainer container, ZkController zkController) {
+    super(zkController.getZkClient());
+    this.name = name;
+    this.properties = properties;
+    this.container = container;
+    this.zkController = zkController;
+    this.timeSource = TimeSource.CURRENT_TIME;
+    this.processorRef = new AtomicReference<>();
+    List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
+    if (o != null && !o.isEmpty()) {
+      actions = new ArrayList<>(3);
+      for (Map<String, String> map : o) {
+        TriggerAction action = container.getResourceLoader().newInstance(map.get("class"), TriggerAction.class);
+        actions.add(action);
+      }
+    } else {
+      actions = Collections.emptyList();
+    }
+    lastLiveNodes = new HashSet<>(zkController.getZkStateReader().getClusterState().getLiveNodes());
+    log.debug("Initial livenodes: {}", lastLiveNodes);
+    this.enabled = Boolean.parseBoolean(String.valueOf(properties.getOrDefault("enabled", "true")));
+    this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
+    this.eventType = TriggerEventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
+    log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties);
+  }
+
+  @Override
+  public void init() {
+    List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
+    if (o != null && !o.isEmpty()) {
+      for (int i = 0; i < o.size(); i++) {
+        Map<String, String> map = o.get(i);
+        actions.get(i).init(map);
+      }
+    }
+    // pick up added nodes for which marker paths were created
+    try {
+      List<String> added = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
+      added.forEach(n -> {
+        // don't add nodes that have since gone away
+        if (lastLiveNodes.contains(n)) {
+          log.debug("Adding node from marker path: {}", n);
+          nodeNameVsTimeAdded.put(n, timeSource.getTime());
+        }
+        removeMarker(n);
+      });
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Exception retrieving nodeLost markers", e);
+    }
+
+  }
+
+  @Override
+  public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
+    processorRef.set(processor);
+  }
+
+  @Override
+  public AutoScaling.TriggerEventProcessor getProcessor() {
+    return processorRef.get();
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public TriggerEventType getEventType() {
+    return eventType;
+  }
+
+  @Override
+  public boolean isEnabled() {
+    return enabled;
+  }
+
+  @Override
+  public int getWaitForSecond() {
+    return waitForSecond;
+  }
+
+  @Override
+  public Map<String, Object> getProperties() {
+    return properties;
+  }
+
+  @Override
+  public List<TriggerAction> getActions() {
+    return actions;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof NodeAddedTrigger) {
+      NodeAddedTrigger that = (NodeAddedTrigger) obj;
+      return this.name.equals(that.name)
+          && this.properties.equals(that.properties);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, properties);
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized (this) {
+      isClosed = true;
+      IOUtils.closeWhileHandlingException(actions);
+    }
+  }
+
+  @Override
+  public void restoreState(AutoScaling.Trigger old) {
+    assert old.isClosed();
+    if (old instanceof NodeAddedTrigger) {
+      NodeAddedTrigger that = (NodeAddedTrigger) old;
+      assert this.name.equals(that.name);
+      this.lastLiveNodes = new HashSet<>(that.lastLiveNodes);
+      this.nodeNameVsTimeAdded = new HashMap<>(that.nodeNameVsTimeAdded);
+    } else  {
+      throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
+          "Unable to restore state from an unknown type of trigger");
+    }
+  }
+
+  @Override
+  protected Map<String, Object> getState() {
+    Map<String,Object> state = new HashMap<>();
+    state.put("lastLiveNodes", lastLiveNodes);
+    state.put("nodeNameVsTimeAdded", nodeNameVsTimeAdded);
+    return state;
+  }
+
+  @Override
+  protected void setState(Map<String, Object> state) {
+    this.lastLiveNodes.clear();
+    this.nodeNameVsTimeAdded.clear();
+    Collection<String> lastLiveNodes = (Collection<String>)state.get("lastLiveNodes");
+    if (lastLiveNodes != null) {
+      this.lastLiveNodes.addAll(lastLiveNodes);
+    }
+    Map<String,Long> nodeNameVsTimeAdded = (Map<String,Long>)state.get("nodeNameVsTimeAdded");
+    if (nodeNameVsTimeAdded != null) {
+      this.nodeNameVsTimeAdded.putAll(nodeNameVsTimeAdded);
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      synchronized (this) {
+        if (isClosed) {
+          log.warn("NodeAddedTrigger ran but was already closed");
+          throw new RuntimeException("Trigger has been closed");
+        }
+      }
+      log.debug("Running NodeAddedTrigger {}", name);
+
+      ZkStateReader reader = zkController.getZkStateReader();
+      Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
+      log.debug("Found livenodes: {}", newLiveNodes);
+
+      // have any nodes that we were tracking been removed from the cluster?
+      // if so, remove them from the tracking map
+      Set<String> trackingKeySet = nodeNameVsTimeAdded.keySet();
+      trackingKeySet.retainAll(newLiveNodes);
+
+      // have any new nodes been added?
+      Set<String> copyOfNew = new HashSet<>(newLiveNodes);
+      copyOfNew.removeAll(lastLiveNodes);
+      copyOfNew.forEach(n -> {
+        long eventTime = timeSource.getTime();
+        log.debug("Tracking new node: {} at time {}", n, eventTime);
+        nodeNameVsTimeAdded.put(n, eventTime);
+      });
+
+      // has enough time expired to trigger events for a node?
+      List<String> nodeNames = new ArrayList<>();
+      List<Long> times = new ArrayList<>();
+      for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeAdded.entrySet().iterator(); it.hasNext(); ) {
+        Map.Entry<String, Long> entry = it.next();
+        String nodeName = entry.getKey();
+        Long timeAdded = entry.getValue();
+        long now = timeSource.getTime();
+        if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
+          nodeNames.add(nodeName);
+          times.add(timeAdded);
+        }
+      }
+      AutoScaling.TriggerEventProcessor processor = processorRef.get();
+      if (!nodeNames.isEmpty()) {
+        if (processor != null) {
+          log.debug("NodeAddedTrigger {} firing registered processor for nodes: {} added at times {}", name, nodeNames, times);
+          if (processor.process(new NodeAddedEvent(getEventType(), getName(), times, nodeNames))) {
+            // remove from tracking set only if the fire was accepted
+            nodeNames.forEach(n -> {
+              nodeNameVsTimeAdded.remove(n);
+              removeMarker(n);
+            });
+          }
+        } else  {
+          nodeNames.forEach(n -> {
+            nodeNameVsTimeAdded.remove(n);
+            removeMarker(n);
+          });
+        }
+      }
+      lastLiveNodes = new HashSet<>(newLiveNodes);
+    } catch (RuntimeException e) {
+      log.error("Unexpected exception in NodeAddedTrigger", e);
+    }
+  }
+
+  private void removeMarker(String nodeName) {
+    String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
+    try {
+      if (zkClient.exists(path, true)) {
+        zkClient.delete(path, -1, true);
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.debug("Exception removing nodeAdded marker " + nodeName, e);
+    }
+
+  }
+
+  @Override
+  public boolean isClosed() {
+    synchronized (this) {
+      return isClosed;
+    }
+  }
+
+  public static class NodeAddedEvent extends TriggerEvent {
+
+    public NodeAddedEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames) {
+      // use the oldest time as the time of the event
+      super(eventType, source, times.get(0), null);
+      properties.put(NODE_NAMES, nodeNames);
+      properties.put(EVENT_TIMES, times);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccf1bf72/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
new file mode 100644
index 0000000..cbcbb3e
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.TimeSource;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Trigger for the {@link TriggerEventType#NODELOST} event
+ */
+public class NodeLostTrigger extends TriggerBase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final String name;
+  private final Map<String, Object> properties;
+  private final CoreContainer container;
+  private final ZkController zkController;
+  private final List<TriggerAction> actions;
+  private final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef;
+  private final boolean enabled;
+  private final int waitForSecond;
+  private final TriggerEventType eventType;
+  private final TimeSource timeSource;
+
+  private boolean isClosed = false;
+
+  private Set<String> lastLiveNodes;
+
+  private Map<String, Long> nodeNameVsTimeRemoved = new HashMap<>();
+
+  public NodeLostTrigger(String name, Map<String, Object> properties,
+                         CoreContainer container, ZkController zkController) {
+    super(zkController.getZkClient());
+    this.name = name;
+    this.properties = properties;
+    this.container = container;
+    this.zkController = zkController;
+    this.timeSource = TimeSource.CURRENT_TIME;
+    this.processorRef = new AtomicReference<>();
+    List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
+    if (o != null && !o.isEmpty()) {
+      actions = new ArrayList<>(3);
+      for (Map<String, String> map : o) {
+        TriggerAction action = container.getResourceLoader().newInstance(map.get("class"), TriggerAction.class);
+        actions.add(action);
+      }
+    } else {
+      actions = Collections.emptyList();
+    }
+    lastLiveNodes = new HashSet<>(zkController.getZkStateReader().getClusterState().getLiveNodes());
+    log.debug("Initial livenodes: {}", lastLiveNodes);
+    this.enabled = Boolean.parseBoolean(String.valueOf(properties.getOrDefault("enabled", "true")));
+    this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
+    this.eventType = TriggerEventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
+  }
+
+  @Override
+  public void init() {
+    List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
+    if (o != null && !o.isEmpty()) {
+      for (int i = 0; i < o.size(); i++) {
+        Map<String, String> map = o.get(i);
+        actions.get(i).init(map);
+      }
+    }
+    // pick up lost nodes for which marker paths were created
+    try {
+      List<String> lost = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
+      lost.forEach(n -> {
+        // don't add nodes that have since came back
+        if (!lastLiveNodes.contains(n)) {
+          log.debug("Adding lost node from marker path: {}", n);
+          nodeNameVsTimeRemoved.put(n, timeSource.getTime());
+        }
+        removeMarker(n);
+      });
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Exception retrieving nodeLost markers", e);
+    }
+  }
+
+  @Override
+  public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
+    processorRef.set(processor);
+  }
+
+  @Override
+  public AutoScaling.TriggerEventProcessor getProcessor() {
+    return processorRef.get();
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public TriggerEventType getEventType() {
+    return eventType;
+  }
+
+  @Override
+  public boolean isEnabled() {
+    return enabled;
+  }
+
+  @Override
+  public int getWaitForSecond() {
+    return waitForSecond;
+  }
+
+  @Override
+  public Map<String, Object> getProperties() {
+    return properties;
+  }
+
+  @Override
+  public List<TriggerAction> getActions() {
+    return actions;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof NodeLostTrigger) {
+      NodeLostTrigger that = (NodeLostTrigger) obj;
+      return this.name.equals(that.name)
+          && this.properties.equals(that.properties);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, properties);
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized (this) {
+      isClosed = true;
+      IOUtils.closeWhileHandlingException(actions);
+    }
+  }
+
+  @Override
+  public void restoreState(AutoScaling.Trigger old) {
+    assert old.isClosed();
+    if (old instanceof NodeLostTrigger) {
+      NodeLostTrigger that = (NodeLostTrigger) old;
+      assert this.name.equals(that.name);
+      this.lastLiveNodes = new HashSet<>(that.lastLiveNodes);
+      this.nodeNameVsTimeRemoved = new HashMap<>(that.nodeNameVsTimeRemoved);
+    } else  {
+      throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
+          "Unable to restore state from an unknown type of trigger");
+    }
+  }
+
+  @Override
+  protected Map<String, Object> getState() {
+    Map<String,Object> state = new HashMap<>();
+    state.put("lastLiveNodes", lastLiveNodes);
+    state.put("nodeNameVsTimeRemoved", nodeNameVsTimeRemoved);
+    return state;
+  }
+
+  @Override
+  protected void setState(Map<String, Object> state) {
+    this.lastLiveNodes.clear();
+    this.nodeNameVsTimeRemoved.clear();
+    Collection<String> lastLiveNodes = (Collection<String>)state.get("lastLiveNodes");
+    if (lastLiveNodes != null) {
+      this.lastLiveNodes.addAll(lastLiveNodes);
+    }
+    Map<String,Long> nodeNameVsTimeRemoved = (Map<String,Long>)state.get("nodeNameVsTimeRemoved");
+    if (nodeNameVsTimeRemoved != null) {
+      this.nodeNameVsTimeRemoved.putAll(nodeNameVsTimeRemoved);
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      synchronized (this) {
+        if (isClosed) {
+          log.warn("NodeLostTrigger ran but was already closed");
+          throw new RuntimeException("Trigger has been closed");
+        }
+      }
+
+      ZkStateReader reader = zkController.getZkStateReader();
+      Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
+      log.debug("Running NodeLostTrigger: {} with currently live nodes: {}", name, newLiveNodes);
+
+      // have any nodes that we were tracking been added to the cluster?
+      // if so, remove them from the tracking map
+      Set<String> trackingKeySet = nodeNameVsTimeRemoved.keySet();
+      trackingKeySet.removeAll(newLiveNodes);
+
+      // have any nodes been removed?
+      Set<String> copyOfLastLiveNodes = new HashSet<>(lastLiveNodes);
+      copyOfLastLiveNodes.removeAll(newLiveNodes);
+      copyOfLastLiveNodes.forEach(n -> {
+        log.debug("Tracking lost node: {}", n);
+        nodeNameVsTimeRemoved.put(n, timeSource.getTime());
+      });
+
+      // has enough time expired to trigger events for a node?
+      List<String> nodeNames = new ArrayList<>();
+      List<Long> times = new ArrayList<>();
+      for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeRemoved.entrySet().iterator(); it.hasNext(); ) {
+        Map.Entry<String, Long> entry = it.next();
+        String nodeName = entry.getKey();
+        Long timeRemoved = entry.getValue();
+        long now = timeSource.getTime();
+        if (TimeUnit.SECONDS.convert(now - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
+          nodeNames.add(nodeName);
+          times.add(timeRemoved);
+        }
+      }
+      // fire!
+      AutoScaling.TriggerEventProcessor processor = processorRef.get();
+      if (!nodeNames.isEmpty()) {
+        if (processor != null) {
+          log.debug("NodeLostTrigger firing registered processor for lost nodes: {}", nodeNames);
+          if (processor.process(new NodeLostEvent(getEventType(), getName(), times, nodeNames)))  {
+            // remove from tracking set only if the fire was accepted
+            nodeNames.forEach(n -> {
+              nodeNameVsTimeRemoved.remove(n);
+              removeMarker(n);
+            });
+          } else  {
+            log.debug("NodeLostTrigger listener for lost nodes: {} is not ready, will try later", nodeNames);
+          }
+        } else  {
+          nodeNames.forEach(n -> {
+            nodeNameVsTimeRemoved.remove(n);
+            removeMarker(n);
+          });
+        }
+      }
+      lastLiveNodes = new HashSet<>(newLiveNodes);
+    } catch (RuntimeException e) {
+      log.error("Unexpected exception in NodeLostTrigger", e);
+    }
+  }
+
+  private void removeMarker(String nodeName) {
+    String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeName;
+    try {
+      if (zkClient.exists(path, true)) {
+        zkClient.delete(path, -1, true);
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Exception removing nodeLost marker " + nodeName, e);
+    }
+  }
+
+  @Override
+  public boolean isClosed() {
+    synchronized (this) {
+      return isClosed;
+    }
+  }
+
+  public static class NodeLostEvent extends TriggerEvent {
+
+    public NodeLostEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames) {
+      // use the oldest time as the time of the event
+      super(eventType, source, times.get(0), null);
+      properties.put(NODE_NAMES, nodeNames);
+      properties.put(EVENT_TIMES, times);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccf1bf72/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
new file mode 100644
index 0000000..367c450
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CloudConfig;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
+
+/**
+ * Overseer thread responsible for reading triggers from zookeeper and
+ * adding/removing them from {@link ScheduledTriggers}
+ */
+public class OverseerTriggerThread implements Runnable, Closeable {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final ZkController zkController;
+
+  private final CloudConfig cloudConfig;
+
+  private final ZkStateReader zkStateReader;
+
+  private final SolrZkClient zkClient;
+
+  private final ScheduledTriggers scheduledTriggers;
+
+  private final AutoScaling.TriggerFactory triggerFactory;
+
+  private final ReentrantLock updateLock = new ReentrantLock();
+
+  private final Condition updated = updateLock.newCondition();
+
+  /*
+  Following variables are only accessed or modified when updateLock is held
+   */
+  private int znodeVersion = -1;
+
+  private Map<String, AutoScaling.Trigger> activeTriggers = new HashMap<>();
+
+  private volatile boolean isClosed = false;
+
+  private AutoScalingConfig autoScalingConfig;
+
+  public OverseerTriggerThread(ZkController zkController, CloudConfig cloudConfig) {
+    this.zkController = zkController;
+    this.cloudConfig = cloudConfig;
+    zkStateReader = zkController.getZkStateReader();
+    zkClient = zkController.getZkClient();
+    scheduledTriggers = new ScheduledTriggers(zkController);
+    triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer(), zkController);
+  }
+
+  @Override
+  public void close() throws IOException {
+    updateLock.lock();
+    try {
+      isClosed = true;
+      activeTriggers.clear();
+      updated.signalAll();
+    } finally {
+      updateLock.unlock();
+    }
+    IOUtils.closeQuietly(triggerFactory);
+    IOUtils.closeQuietly(scheduledTriggers);
+    log.debug("OverseerTriggerThread has been closed explicitly");
+  }
+
+  @Override
+  public void run() {
+    int lastZnodeVersion = znodeVersion;
+
+    // we automatically add a trigger for auto add replicas if it does not exists already
+    while (!isClosed)  {
+      try {
+        AutoScalingConfig autoScalingConfig = zkStateReader.getAutoScalingConfig();
+        AutoScalingConfig withAutoAddReplicasTrigger = withAutoAddReplicasTrigger(autoScalingConfig);
+        if (withAutoAddReplicasTrigger.equals(autoScalingConfig)) break;
+        log.debug("Adding .autoAddReplicas trigger");
+        zkClient.setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(withAutoAddReplicasTrigger), withAutoAddReplicasTrigger.getZkVersion(), true);
+        break;
+      } catch (KeeperException.BadVersionException bve) {
+        // somebody else has changed the configuration so we must retry
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.warn("Interrupted", e);
+        break;
+      } catch (KeeperException e) {
+        log.error("A ZK error has occurred", e);
+      }
+    }
+
+    if (isClosed || Thread.currentThread().isInterrupted())  return;
+
+    try {
+      refreshAutoScalingConf(new AutoScalingWatcher());
+    } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
+      log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
+    } catch (KeeperException e) {
+      log.error("A ZK error has occurred", e);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+      log.warn("Interrupted", e);
+    } catch (Exception e)  {
+      log.error("Unexpected exception", e);
+    }
+
+    while (true) {
+      Map<String, AutoScaling.Trigger> copy = null;
+      try {
+        // this can throw InterruptedException and we don't want to unlock if it did, so we keep this outside
+        // of the try/finally block
+        updateLock.lockInterruptibly();
+
+        // must check for close here before we await on the condition otherwise we can only be woken up on interruption
+        if (isClosed) {
+          log.warn("OverseerTriggerThread has been closed, exiting.");
+          break;
+        }
+
+        log.debug("Current znodeVersion {}, lastZnodeVersion {}", znodeVersion, lastZnodeVersion);
+
+        try {
+          if (znodeVersion == lastZnodeVersion) {
+            updated.await();
+
+            // are we closed?
+            if (isClosed) {
+              log.warn("OverseerTriggerThread woken up but we are closed, exiting.");
+              break;
+            }
+
+            // spurious wakeup?
+            if (znodeVersion == lastZnodeVersion) continue;
+          }
+          copy = new HashMap<>(activeTriggers);
+          lastZnodeVersion = znodeVersion;
+          log.debug("Processed trigger updates upto znodeVersion {}", znodeVersion);
+        } catch (InterruptedException e) {
+          // Restore the interrupted status
+          Thread.currentThread().interrupt();
+          log.warn("Interrupted", e);
+          break;
+        } finally {
+          updateLock.unlock();
+        }
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.warn("Interrupted", e);
+        break;
+      }
+
+      // update the current config
+      scheduledTriggers.setAutoScalingConfig(autoScalingConfig);
+
+      Set<String> managedTriggerNames = scheduledTriggers.getScheduledTriggerNames();
+      // remove the triggers which are no longer active
+      for (String managedTriggerName : managedTriggerNames) {
+        if (!copy.containsKey(managedTriggerName)) {
+          scheduledTriggers.remove(managedTriggerName);
+        }
+      }
+      // check for nodeLost triggers in the current config, and if
+      // absent then clean up old nodeLost / nodeAdded markers
+      boolean cleanOldNodeLostMarkers = true;
+      boolean cleanOldNodeAddedMarkers = true;
+      try {
+        // add new triggers and/or replace and close the replaced triggers
+        for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
+          if (entry.getValue().getEventType().equals(TriggerEventType.NODELOST)) {
+            cleanOldNodeLostMarkers = false;
+          }
+          if (entry.getValue().getEventType().equals(TriggerEventType.NODEADDED)) {
+            cleanOldNodeAddedMarkers = false;
+          }
+          scheduledTriggers.add(entry.getValue());
+        }
+      } catch (AlreadyClosedException e) {
+        // this _should_ mean that we're closing, complain loudly if that's not the case
+        if (isClosed) {
+          return;
+        } else {
+          throw new IllegalStateException("Caught AlreadyClosedException from ScheduledTriggers, but we're not closed yet!", e);
+        }
+      }
+      if (cleanOldNodeLostMarkers) {
+        log.debug("-- clean old nodeLost markers");
+        try {
+          List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
+          markers.forEach(n -> {
+            removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, n);
+          });
+        } catch (KeeperException.NoNodeException e) {
+          // ignore
+        } catch (KeeperException | InterruptedException e) {
+          log.warn("Error removing old nodeLost markers", e);
+        }
+      }
+      if (cleanOldNodeAddedMarkers) {
+        log.debug("-- clean old nodeAdded markers");
+        try {
+          List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
+          markers.forEach(n -> {
+            removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, n);
+          });
+        } catch (KeeperException.NoNodeException e) {
+          // ignore
+        } catch (KeeperException | InterruptedException e) {
+          log.warn("Error removing old nodeAdded markers", e);
+        }
+
+      }
+    }
+  }
+
+  private void removeNodeMarker(String path, String nodeName) {
+    path = path + "/" + nodeName;
+    try {
+      zkClient.delete(path, -1, true);
+      log.debug("  -- deleted " + path);
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Error removing old marker " + path, e);
+    }
+  }
+
+  class AutoScalingWatcher implements Watcher  {
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      // session events are not change events, and do not remove the watcher
+      if (Event.EventType.None.equals(watchedEvent.getType())) {
+        return;
+      }
+
+      try {
+        refreshAutoScalingConf(this);
+      } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
+        log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
+      } catch (KeeperException e) {
+        log.error("A ZK error has occurred", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.warn("Interrupted", e);
+      } catch (Exception e)  {
+        log.error("Unexpected exception", e);
+      }
+    }
+
+  }
+
+  private void refreshAutoScalingConf(Watcher watcher) throws KeeperException, InterruptedException {
+    updateLock.lock();
+    try {
+      if (isClosed) {
+        return;
+      }
+      AutoScalingConfig currentConfig = zkStateReader.getAutoScalingConfig(watcher);
+      log.debug("Refreshing {} with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, currentConfig.getZkVersion());
+      if (znodeVersion >= currentConfig.getZkVersion()) {
+        // protect against reordered watcher fires by ensuring that we only move forward
+        return;
+      }
+      autoScalingConfig = currentConfig;
+      znodeVersion = autoScalingConfig.getZkVersion();
+      Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, autoScalingConfig);
+
+      // remove all active triggers that have been removed from ZK
+      Set<String> trackingKeySet = activeTriggers.keySet();
+      trackingKeySet.retainAll(triggerMap.keySet());
+
+      // now lets add or remove triggers which have been enabled or disabled respectively
+      for (Map.Entry<String, AutoScaling.Trigger> entry : triggerMap.entrySet()) {
+        String triggerName = entry.getKey();
+        AutoScaling.Trigger trigger = entry.getValue();
+        if (trigger.isEnabled()) {
+          activeTriggers.put(triggerName, trigger);
+        } else {
+          activeTriggers.remove(triggerName);
+        }
+      }
+      updated.signalAll();
+    } finally {
+      updateLock.unlock();
+    }
+  }
+
+  private AutoScalingConfig withAutoAddReplicasTrigger(AutoScalingConfig autoScalingConfig) {
+    Map<String, Object> triggerProps = AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_PROPS;
+    String triggerName = (String) triggerProps.get("name");
+    Map<String, AutoScalingConfig.TriggerConfig> configs = autoScalingConfig.getTriggerConfigs();
+    for (AutoScalingConfig.TriggerConfig cfg : configs.values()) {
+      if (triggerName.equals(cfg.name)) {
+        // already has this trigger
+        return autoScalingConfig;
+      }
+    }
+    // need to add
+    triggerProps.computeIfPresent("waitFor", (k, v) -> (long) (cloudConfig.getAutoReplicaFailoverWaitAfterExpiration() / 1000));
+    AutoScalingConfig.TriggerConfig config = new AutoScalingConfig.TriggerConfig(triggerName, triggerProps);
+    autoScalingConfig = autoScalingConfig.withTriggerConfig(config);
+    // need to add SystemLogListener explicitly here
+    autoScalingConfig = AutoScalingHandler.withSystemLogListener(autoScalingConfig, triggerName);
+    return autoScalingConfig;
+  }
+
+  private static Map<String, AutoScaling.Trigger> loadTriggers(AutoScaling.TriggerFactory triggerFactory, AutoScalingConfig autoScalingConfig) {
+    Map<String, AutoScalingConfig.TriggerConfig> triggers = autoScalingConfig.getTriggerConfigs();
+    if (triggers == null) {
+      return Collections.emptyMap();
+    }
+
+    Map<String, AutoScaling.Trigger> triggerMap = new HashMap<>(triggers.size());
+
+    for (Map.Entry<String, AutoScalingConfig.TriggerConfig> entry : triggers.entrySet()) {
+      AutoScalingConfig.TriggerConfig cfg = entry.getValue();
+      TriggerEventType eventType = cfg.event;
+      String triggerName = entry.getKey();
+      triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, cfg.properties));
+    }
+    return triggerMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccf1bf72/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
new file mode 100644
index 0000000..7a96552
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest.RequestStatusResponse;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.cloud.ActionThrottle;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.ExecutePlanAction.waitForTaskToFinish;
+
+/**
+ * Responsible for scheduling active triggers, starting and stopping them and
+ * performing actions when they fire
+ */
+public class ScheduledTriggers implements Closeable {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
+  static final int DEFAULT_MIN_MS_BETWEEN_ACTIONS = 5000;
+
+  private final Map<String, ScheduledTrigger> scheduledTriggers = new ConcurrentHashMap<>();
+
+  /**
+   * Thread pool for scheduling the triggers
+   */
+  private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+
+  /**
+   * Single threaded executor to run the actions upon a trigger event. We rely on this being a single
+   * threaded executor to ensure that trigger fires do not step on each other as well as to ensure
+   * that we do not run scheduled trigger threads while an action has been submitted to this executor
+   */
+  private final ExecutorService actionExecutor;
+
+  private boolean isClosed = false;
+
+  private final AtomicBoolean hasPendingActions = new AtomicBoolean(false);
+
+  private final ActionThrottle actionThrottle;
+
+  private final SolrZkClient zkClient;
+
+  private final Overseer.Stats queueStats;
+
+  private final CoreContainer coreContainer;
+
+  private final TriggerListeners listeners;
+
+  private AutoScalingConfig autoScalingConfig;
+
+  public ScheduledTriggers(ZkController zkController) {
+    // todo make the core pool size configurable
+    // it is important to use more than one because a time taking trigger can starve other scheduled triggers
+    // ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand
+    // how many triggers we have and secondly, that many threads will always be instantiated and kept around idle
+    // so it is wasteful as well. Hopefully 4 is a good compromise.
+    scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(4,
+        new DefaultSolrThreadFactory("ScheduledTrigger"));
+    scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
+    scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+    actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
+    // todo make the wait time configurable
+    actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
+    coreContainer = zkController.getCoreContainer();
+    zkClient = zkController.getZkClient();
+    queueStats = new Overseer.Stats();
+    listeners = new TriggerListeners();
+  }
+
+  /**
+   * Set the current autoscaling config. This is invoked by {@link OverseerTriggerThread} when autoscaling.json is updated,
+   * and it re-initializes trigger listeners.
+   * @param autoScalingConfig current autoscaling.json
+   */
+  public void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
+    this.autoScalingConfig = autoScalingConfig;
+    listeners.setAutoScalingConfig(autoScalingConfig);
+  }
+
+  /**
+   * Adds a new trigger or replaces an existing one. The replaced trigger, if any, is closed
+   * <b>before</b> the new trigger is run. If a trigger is replaced with itself then this
+   * operation becomes a no-op.
+   *
+   * @param newTrigger the trigger to be managed
+   * @throws AlreadyClosedException if this class has already been closed
+   */
+  public synchronized void add(AutoScaling.Trigger newTrigger) {
+    if (isClosed) {
+      throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
+    }
+    ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger, zkClient, queueStats);
+    ScheduledTrigger old = scheduledTriggers.putIfAbsent(newTrigger.getName(), scheduledTrigger);
+    if (old != null) {
+      if (old.trigger.equals(newTrigger)) {
+        // the trigger wasn't actually modified so we do nothing
+        return;
+      }
+      IOUtils.closeQuietly(old);
+      newTrigger.restoreState(old.trigger);
+      scheduledTrigger.setReplay(false);
+      scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
+    }
+    newTrigger.setProcessor(event -> {
+      if (coreContainer.isShutDown()) {
+        String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because Solr has been shutdown.", event.toString());
+        log.warn(msg);
+        listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
+        return false;
+      }
+      ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource());
+      if (scheduledSource == null) {
+        String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s doesn't exist.", event.toString(), event.getSource());
+        listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, msg);
+        log.warn(msg);
+        return false;
+      }
+      boolean replaying = event.getProperty(TriggerEvent.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEvent.REPLAYING) : false;
+      AutoScaling.Trigger source = scheduledSource.trigger;
+      if (source.isClosed()) {
+        String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s has already been closed", event.toString(), source);
+        listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
+        log.warn(msg);
+        // we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
+        return false;
+      }
+      if (hasPendingActions.compareAndSet(false, true)) {
+        final boolean enqueued;
+        if (replaying) {
+          enqueued = false;
+        } else {
+          enqueued = scheduledTrigger.enqueue(event);
+        }
+        // fire STARTED event listeners after enqueuing the event is successful
+        listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.STARTED);
+        List<TriggerAction> actions = source.getActions();
+        if (actions != null) {
+          actionExecutor.submit(() -> {
+            assert hasPendingActions.get();
+            log.debug("-- processing actions for " + event);
+            try {
+              // let the action executor thread wait instead of the trigger thread so we use the throttle here
+              actionThrottle.minimumWaitBetweenActions();
+              actionThrottle.markAttemptingAction();
+
+              // in future, we could wait for pending tasks in a different thread and re-enqueue
+              // this event so that we continue processing other events and not block this action executor
+              waitForPendingTasks(newTrigger, actions);
+
+              ActionContext actionContext = new ActionContext(coreContainer, newTrigger, new HashMap<>());
+              for (TriggerAction action : actions) {
+                List<String> beforeActions = (List<String>)actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.BEFORE_ACTION.toString(), k -> new ArrayList<String>());
+                beforeActions.add(action.getName());
+                listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
+                try {
+                  action.process(event, actionContext);
+                } catch (Exception e) {
+                  listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, action.getName(), actionContext, e, null);
+                  log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
+                  throw e;
+                }
+                List<String> afterActions = (List<String>)actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.AFTER_ACTION.toString(), k -> new ArrayList<String>());
+                afterActions.add(action.getName());
+                listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
+              }
+              if (enqueued) {
+                TriggerEvent ev = scheduledTrigger.dequeue();
+                assert ev.getId().equals(event.getId());
+              }
+              listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
+            } finally {
+              hasPendingActions.set(false);
+            }
+          });
+        } else {
+          if (enqueued) {
+            TriggerEvent ev = scheduledTrigger.dequeue();
+            if (!ev.getId().equals(event.getId())) {
+              throw new RuntimeException("Wrong event dequeued, queue of " + scheduledTrigger.trigger.getName()
+              + " is broken! Expected event=" + event + " but got " + ev);
+            }
+          }
+          listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
+          hasPendingActions.set(false);
+        }
+        return true;
+      } else {
+        // there is an action in the queue and we don't want to enqueue another until it is complete
+        return false;
+      }
+    });
+    newTrigger.init(); // mark as ready for scheduling
+    scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, 0, DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS);
+  }
+
+  private void waitForPendingTasks(AutoScaling.Trigger newTrigger, List<TriggerAction> actions) throws AlreadyClosedException {
+    try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder()
+        .withZkHost(coreContainer.getZkController().getZkServerAddress())
+        .withHttpClient(coreContainer.getUpdateShardHandler().getHttpClient())
+        .build()) {
+
+      SolrZkClient zkClient = coreContainer.getZkController().getZkClient();
+
+      for (TriggerAction action : actions) {
+        if (action instanceof ExecutePlanAction) {
+          String parentPath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + newTrigger.getName() + "/" + action.getName();
+          if (!zkClient.exists(parentPath, true))  {
+            break;
+          }
+          List<String> children = zkClient.getChildren(parentPath, null, true);
+          if (children != null) {
+            for (String child : children) {
+              String path = parentPath + '/' + child;
+              byte[] data = zkClient.getData(path, null, null, true);
+              if (data != null) {
+                Map map = (Map) Utils.fromJSON(data);
+                String requestid = (String) map.get("requestid");
+                try {
+                  log.debug("Found pending task with requestid={}", requestid);
+                  RequestStatusResponse statusResponse = waitForTaskToFinish(cloudSolrClient, requestid,
+                      ExecutePlanAction.DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+                  if (statusResponse != null) {
+                    RequestStatusState state = statusResponse.getRequestStatus();
+                    if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED || state == RequestStatusState.NOT_FOUND) {
+                      zkClient.delete(path, -1, true);
+                    }
+                  }
+                } catch (Exception e) {
+                  if (coreContainer.isShutDown())  {
+                    throw e; // propagate the abort to the caller
+                  }
+                  Throwable rootCause = ExceptionUtils.getRootCause(e);
+                  if (rootCause instanceof IllegalStateException && rootCause.getMessage().contains("Connection pool shut down")) {
+                    throw e;
+                  }
+                  if (rootCause instanceof TimeoutException && rootCause.getMessage().contains("Could not connect to ZooKeeper")) {
+                    throw e;
+                  }
+                  log.error("Unexpected exception while waiting for pending task with requestid: " + requestid + " to finish", e);
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted", e);
+    } catch (Exception e) {
+      if (coreContainer.isShutDown())  {
+        throw new AlreadyClosedException("The Solr instance has been shutdown");
+      }
+      // we catch but don't rethrow because a failure to wait for pending tasks
+      // should not keep the actions from executing
+      log.error("Unexpected exception while waiting for pending tasks to finish", e);
+    }
+  }
+
+  /**
+   * Removes and stops the trigger with the given name. Also cleans up any leftover
+   * state / events in ZK.
+   *
+   * @param triggerName the name of the trigger to be removed
+   */
+  public synchronized void remove(String triggerName) {
+    ScheduledTrigger removed = scheduledTriggers.remove(triggerName);
+    IOUtils.closeQuietly(removed);
+    removeTriggerZKData(triggerName);
+  }
+
+  private void removeTriggerZKData(String triggerName) {
+    String statePath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName;
+    String eventsPath = ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName;
+    try {
+      zkDelTree(zkClient, statePath);
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Failed to remove state for removed trigger " + statePath, e);
+    }
+    try {
+      zkDelTree(zkClient, eventsPath);
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Failed to remove events for removed trigger " + eventsPath, e);
+    }
+  }
+
+  static List<OpResult> zkDelTree(SolrZkClient zkClient, String znode) throws KeeperException, InterruptedException {
+    if (zkClient.exists(znode, true)) {
+      ArrayList<Op> ops = new ArrayList<>();
+      zkDelTree(zkClient, znode, ops);
+      return zkClient.multi(ops, true);
+    }
+    return Collections.emptyList();
+  }
+
+  private static void zkDelTree(SolrZkClient zkClient, String znode, ArrayList<Op> ops) throws KeeperException, InterruptedException {
+    if (zkClient.exists(znode, true)) {
+      List<String> children = zkClient.getChildren(znode, null, true);
+      if (children != null) {
+        for (String child : children) {
+          String path = znode + "/" + child;
+          zkDelTree(zkClient, path, ops);
+        }
+      }
+      ops.add(Op.delete(znode, -1));
+    }
+  }
+
+  /**
+   * @return an unmodifiable set of names of all triggers being managed by this class
+   */
+  public synchronized Set<String> getScheduledTriggerNames() {
+    return Collections.unmodifiableSet(new HashSet<>(scheduledTriggers.keySet())); // shallow copy
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized (this) {
+      // mark that we are closed
+      isClosed = true;
+      for (ScheduledTrigger scheduledTrigger : scheduledTriggers.values()) {
+        IOUtils.closeQuietly(scheduledTrigger);
+      }
+      scheduledTriggers.clear();
+    }
+    // shutdown and interrupt all running tasks because there's no longer any
+    // guarantee about cluster state
+    scheduledThreadPoolExecutor.shutdownNow();
+    actionExecutor.shutdownNow();
+    listeners.close();
+  }
+
+  private class ScheduledTrigger implements Runnable, Closeable {
+    AutoScaling.Trigger trigger;
+    ScheduledFuture<?> scheduledFuture;
+    TriggerEventQueue queue;
+    boolean replay;
+    volatile boolean isClosed;
+
+    ScheduledTrigger(AutoScaling.Trigger trigger, SolrZkClient zkClient, Overseer.Stats stats) {
+      this.trigger = trigger;
+      this.queue = new TriggerEventQueue(zkClient, trigger.getName(), stats);
+      this.replay = true;
+      this.isClosed = false;
+    }
+
+    public void setReplay(boolean replay) {
+      this.replay = replay;
+    }
+
+    public boolean enqueue(TriggerEvent event) {
+      if (isClosed) {
+        throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
+      }
+      return queue.offerEvent(event);
+    }
+
+    public TriggerEvent dequeue() {
+      if (isClosed) {
+        throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
+      }
+      TriggerEvent event = queue.pollEvent();
+      return event;
+    }
+
+    @Override
+    public void run() {
+      if (isClosed) {
+        throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
+      }
+      // fire a trigger only if an action is not pending
+      // note this is not fool proof e.g. it does not prevent an action being executed while a trigger
+      // is still executing. There is additional protection against that scenario in the event listener.
+      if (!hasPendingActions.get())  {
+        // replay accumulated events on first run, if any
+        if (replay) {
+          TriggerEvent event;
+          // peek first without removing - we may crash before calling the listener
+          while ((event = queue.peekEvent()) != null) {
+            // override REPLAYING=true
+            event.getProperties().put(TriggerEvent.REPLAYING, true);
+            if (! trigger.getProcessor().process(event)) {
+              log.error("Failed to re-play event, discarding: " + event);
+            }
+            queue.pollEvent(); // always remove it from queue
+          }
+          // now restore saved state to possibly generate new events from old state on the first run
+          try {
+            trigger.restoreState();
+          } catch (Exception e) {
+            // log but don't throw - see below
+            log.error("Error restoring trigger state " + trigger.getName(), e);
+          }
+          replay = false;
+        }
+        try {
+          trigger.run();
+        } catch (Exception e) {
+          // log but do not propagate exception because an exception thrown from a scheduled operation
+          // will suppress future executions
+          log.error("Unexpected exception from trigger: " + trigger.getName(), e);
+        } finally {
+          // checkpoint after each run
+          trigger.saveState();
+        }
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      isClosed = true;
+      if (scheduledFuture != null) {
+        scheduledFuture.cancel(true);
+      }
+      IOUtils.closeQuietly(trigger);
+    }
+  }
+
+  private class TriggerListeners {
+    Map<String, Map<TriggerEventProcessorStage, List<TriggerListener>>> listenersPerStage = new HashMap<>();
+    Map<String, TriggerListener> listenersPerName = new HashMap<>();
+    ReentrantLock updateLock = new ReentrantLock();
+
+    void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
+      updateLock.lock();
+      // we will recreate this from scratch
+      listenersPerStage.clear();
+      try {
+        Set<String> triggerNames = autoScalingConfig.getTriggerConfigs().keySet();
+        Map<String, AutoScalingConfig.TriggerListenerConfig> configs = autoScalingConfig.getTriggerListenerConfigs();
+        Set<String> listenerNames = configs.entrySet().stream().map(entry -> entry.getValue().name).collect(Collectors.toSet());
+        // close those for non-existent triggers and nonexistent listener configs
+        for (Iterator<Map.Entry<String, TriggerListener>> it = listenersPerName.entrySet().iterator(); it.hasNext(); ) {
+          Map.Entry<String, TriggerListener> entry = it.next();
+          String name = entry.getKey();
+          TriggerListener listener = entry.getValue();
+          if (!triggerNames.contains(listener.getConfig().trigger) || !listenerNames.contains(name)) {
+            try {
+              listener.close();
+            } catch (Exception e) {
+              log.warn("Exception closing old listener " + listener.getConfig(), e);
+            }
+            it.remove();
+          }
+        }
+        for (Map.Entry<String, AutoScalingConfig.TriggerListenerConfig> entry : configs.entrySet()) {
+          AutoScalingConfig.TriggerListenerConfig config = entry.getValue();
+          if (!triggerNames.contains(config.trigger)) {
+            log.debug("-- skipping listener for non-existent trigger: {}", config);
+            continue;
+          }
+          // find previous instance and reuse if possible
+          TriggerListener oldListener = listenersPerName.get(config.name);
+          TriggerListener listener = null;
+          if (oldListener != null) {
+            if (!oldListener.getConfig().equals(config)) { // changed config
+              try {
+                oldListener.close();
+              } catch (Exception e) {
+                log.warn("Exception closing old listener " + oldListener.getConfig(), e);
+              }
+            } else {
+              listener = oldListener; // reuse
+            }
+          }
+          if (listener == null) { // create new instance
+            String clazz = config.listenerClass;
+            try {
+              listener = coreContainer.getResourceLoader().newInstance(clazz, TriggerListener.class);
+            } catch (Exception e) {
+              log.warn("Invalid TriggerListener class name '" + clazz + "', skipping...", e);
+            }
+            if (listener != null) {
+              try {
+                listener.init(coreContainer, config);
+                listenersPerName.put(config.name, listener);
+              } catch (Exception e) {
+                log.warn("Error initializing TriggerListener " + config, e);
+                IOUtils.closeQuietly(listener);
+                listener = null;
+              }
+            }
+          }
+          if (listener == null) {
+            continue;
+          }
+          // add per stage
+          for (TriggerEventProcessorStage stage : config.stages) {
+            addPerStage(config.trigger, stage, listener);
+          }
+          // add also for beforeAction / afterAction TriggerStage
+          if (!config.beforeActions.isEmpty()) {
+            addPerStage(config.trigger, TriggerEventProcessorStage.BEFORE_ACTION, listener);
+          }
+          if (!config.afterActions.isEmpty()) {
+            addPerStage(config.trigger, TriggerEventProcessorStage.AFTER_ACTION, listener);
+          }
+        }
+      } finally {
+        updateLock.unlock();
+      }
+    }
+
+    private void addPerStage(String triggerName, TriggerEventProcessorStage stage, TriggerListener listener) {
+      Map<TriggerEventProcessorStage, List<TriggerListener>> perStage =
+          listenersPerStage.computeIfAbsent(triggerName, k -> new HashMap<>());
+      List<TriggerListener> lst = perStage.computeIfAbsent(stage, k -> new ArrayList<>(3));
+      lst.add(listener);
+    }
+
+    void reset() {
+      updateLock.lock();
+      try {
+        listenersPerStage.clear();
+        for (TriggerListener listener : listenersPerName.values()) {
+          IOUtils.closeQuietly(listener);
+        }
+        listenersPerName.clear();
+      } finally {
+        updateLock.unlock();
+      }
+    }
+
+    void close() {
+      reset();
+    }
+
+    List<TriggerListener> getTriggerListeners(String trigger, TriggerEventProcessorStage stage) {
+      Map<TriggerEventProcessorStage, List<TriggerListener>> perStage = listenersPerStage.get(trigger);
+      if (perStage == null) {
+        return Collections.emptyList();
+      }
+      List<TriggerListener> lst = perStage.get(stage);
+      if (lst == null) {
+        return Collections.emptyList();
+      } else {
+        return Collections.unmodifiableList(lst);
+      }
+    }
+
+    void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage) {
+      fireListeners(trigger, event, stage, null, null, null, null);
+    }
+
+    void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String message) {
+      fireListeners(trigger, event, stage, null, null, null, message);
+    }
+
+    void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
+                       ActionContext context) {
+      fireListeners(trigger, event, stage, actionName, context, null, null);
+    }
+
+    void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
+                       ActionContext context, Throwable error, String message) {
+      updateLock.lock();
+      try {
+        for (TriggerListener listener : getTriggerListeners(trigger, stage)) {
+          if (actionName != null) {
+            AutoScalingConfig.TriggerListenerConfig config = listener.getConfig();
+            if (stage == TriggerEventProcessorStage.BEFORE_ACTION) {
+              if (!config.beforeActions.contains(actionName)) {
+                continue;
+              }
+            } else if (stage == TriggerEventProcessorStage.AFTER_ACTION) {
+              if (!config.afterActions.contains(actionName)) {
+                continue;
+              }
+            }
+          }
+          try {
+            listener.onEvent(event, stage, actionName, context, error, message);
+          } catch (Exception e) {
+            log.warn("Exception running listener " + listener.getConfig(), e);
+          }
+        }
+      } finally {
+        updateLock.unlock();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccf1bf72/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java
new file mode 100644
index 0000000..a72b174
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringJoiner;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.IdUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This listener saves events to the {@link CollectionAdminParams#SYSTEM_COLL} collection.
+ * <p>Configuration properties:</p>
+ * <ul>
+ *   <li>collection - optional string, specifies what collection should be used for storing events. Default value
+ *   is {@link CollectionAdminParams#SYSTEM_COLL}.</li>
+ * </ul>
+ */
+public class SystemLogListener extends TriggerListenerBase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final String SOURCE_FIELD = "source_s";
+  public static final String EVENT_SOURCE_FIELD = "event.source_s";
+  public static final String EVENT_TYPE_FIELD = "event.type_s";
+  public static final String STAGE_FIELD = "stage_s";
+  public static final String ACTION_FIELD = "action_s";
+  public static final String MESSAGE_FIELD = "message_t";
+  public static final String BEFORE_ACTIONS_FIELD = "before.actions_ss";
+  public static final String AFTER_ACTIONS_FIELD = "after.actions_ss";
+  public static final String COLLECTIONS_FIELD = "collections_ss";
+  public static final String SOURCE = SystemLogListener.class.getSimpleName();
+  public static final String DOC_TYPE = "autoscaling_event";
+
+  private String collection = CollectionAdminParams.SYSTEM_COLL;
+  private boolean enabled = true;
+
+  @Override
+  public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
+    super.init(coreContainer, config);
+    collection = (String)config.properties.getOrDefault(CollectionAdminParams.COLLECTION, CollectionAdminParams.SYSTEM_COLL);
+    enabled = Boolean.parseBoolean(String.valueOf(config.properties.getOrDefault("enabled", true)));
+  }
+
+  @Override
+  public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context,
+               Throwable error, String message) throws Exception {
+    if (!enabled) {
+      return;
+    }
+    try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder()
+        .withZkHost(coreContainer.getZkController().getZkServerAddress())
+        .withHttpClient(coreContainer.getUpdateShardHandler().getHttpClient())
+        .build()) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField(CommonParams.TYPE, DOC_TYPE);
+      doc.addField(SOURCE_FIELD, SOURCE);
+      doc.addField("id", IdUtils.timeRandomId());
+      doc.addField("event.id_s", event.getId());
+      doc.addField(EVENT_TYPE_FIELD, event.getEventType().toString());
+      doc.addField(EVENT_SOURCE_FIELD, event.getSource());
+      doc.addField("event.time_l", event.getEventTime());
+      doc.addField("timestamp", new Date());
+      addMap("event.property.", doc, event.getProperties());
+      doc.addField(STAGE_FIELD, stage.toString());
+      if (actionName != null) {
+        doc.addField(ACTION_FIELD, actionName);
+      }
+      if (message != null) {
+        doc.addField(MESSAGE_FIELD, message);
+      }
+      addError(doc, error);
+      // add JSON versions of event and context
+      String eventJson = Utils.toJSONString(event);
+      doc.addField("event_str", eventJson);
+      if (context != null) {
+        // capture specifics of operations after compute_plan action
+        addOperations(doc, (List<SolrRequest>)context.getProperties().get("operations"));
+        // capture specifics of responses after execute_plan action
+        addResponses(doc, (List<NamedList<Object>>)context.getProperties().get("responses"));
+        addActions(BEFORE_ACTIONS_FIELD, doc, (List<String>)context.getProperties().get(TriggerEventProcessorStage.BEFORE_ACTION.toString()));
+        addActions(AFTER_ACTIONS_FIELD, doc, (List<String>)context.getProperties().get(TriggerEventProcessorStage.AFTER_ACTION.toString()));
+        String contextJson = Utils.toJSONString(context);
+        doc.addField("context_str", contextJson);
+      }
+      UpdateRequest req = new UpdateRequest();
+      req.add(doc);
+      cloudSolrClient.request(req, collection);
+    } catch (Exception e) {
+      if ((e instanceof SolrException) && e.getMessage().contains("Collection not found")) {
+        // relatively benign
+        log.info("Collection " + collection + " does not exist, disabling logging.");
+        enabled = false;
+      } else {
+        log.warn("Exception sending event to collection " + collection, e);
+      }
+    }
+  }
+
+  private void addActions(String field, SolrInputDocument doc, List<String> actions) {
+    if (actions == null) {
+      return;
+    }
+    actions.forEach(a -> doc.addField(field, a));
+  }
+
+  private void addMap(String prefix, SolrInputDocument doc, Map<String, Object> map) {
+    map.forEach((k, v) -> {
+      if (v instanceof Collection) {
+        for (Object o : (Collection)v) {
+          doc.addField(prefix + k + "_ss", String.valueOf(o));
+        }
+      } else {
+        doc.addField(prefix + k + "_ss", String.valueOf(v));
+      }
+    });
+  }
+
+  private void addOperations(SolrInputDocument doc, List<SolrRequest> operations) {
+    if (operations == null || operations.isEmpty()) {
+      return;
+    }
+    Set<String> collections = new HashSet<>();
+    for (SolrRequest req : operations) {
+      SolrParams params = req.getParams();
+      if (params == null) {
+        continue;
+      }
+      if (params.get(CollectionAdminParams.COLLECTION) != null) {
+        collections.add(params.get(CollectionAdminParams.COLLECTION));
+      }
+      // build a whitespace-separated param string
+      StringJoiner paramJoiner = new StringJoiner(" ");
+      paramJoiner.setEmptyValue("");
+      for (Iterator<String> it = params.getParameterNamesIterator(); it.hasNext(); ) {
+        final String name = it.next();
+        final String [] values = params.getParams(name);
+        for (String value : values) {
+          paramJoiner.add(name + "=" + value);
+        }
+      }
+      String paramString = paramJoiner.toString();
+      if (!paramString.isEmpty()) {
+        doc.addField("operations.params_ts", paramString);
+      }
+    }
+    if (!collections.isEmpty()) {
+      doc.addField(COLLECTIONS_FIELD, collections);
+    }
+  }
+
+  private void addResponses(SolrInputDocument doc, List<NamedList<Object>> responses) {
+    if (responses == null || responses.isEmpty()) {
+      return;
+    }
+    for (NamedList<Object> rsp : responses) {
+      Object o = rsp.get("success");
+      if (o != null) {
+        doc.addField("responses_ts", "success " + o);
+      } else {
+        o = rsp.get("failure");
+        if (o != null) {
+          doc.addField("responses_ts", "failure " + o);
+        } else { // something else
+          doc.addField("responses_ts", Utils.toJSONString(rsp));
+        }
+      }
+    }
+  }
+
+  private void addError(SolrInputDocument doc, Throwable error) {
+    if (error == null) {
+      return;
+    }
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    error.printStackTrace(pw);
+    pw.flush(); pw.close();
+    doc.addField("error.message_t", error.getMessage());
+    doc.addField("error.details_t", sw.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccf1bf72/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java
new file mode 100644
index 0000000..76cd5f0
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.Closeable;
+
+import org.apache.solr.util.plugin.MapInitializedPlugin;
+
+/**
+ * Interface for actions performed in response to a trigger being activated
+ */
+public interface TriggerAction extends MapInitializedPlugin, Closeable {
+  String getName();
+
+  void process(TriggerEvent event, ActionContext context);
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccf1bf72/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionBase.java
new file mode 100644
index 0000000..75c4a87
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionBase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Base class for {@link TriggerAction} implementations.
+ */
+public abstract class TriggerActionBase implements TriggerAction {
+
+  protected Map<String, String> initArgs;
+
+  @Override
+  public String getName() {
+    if (initArgs != null) {
+      return initArgs.get("name");
+    } else {
+      return getClass().getSimpleName();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public void init(Map<String, String> args) {
+    this.initArgs = args;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccf1bf72/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
new file mode 100644
index 0000000..7aff846
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} implementations.
+ * It handles state snapshot / restore in ZK.
+ */
+public abstract class TriggerBase implements AutoScaling.Trigger {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  protected SolrZkClient zkClient;
+  protected Map<String,Object> lastState;
+
+
+  protected TriggerBase(SolrZkClient zkClient) {
+    this.zkClient = zkClient;
+    try {
+      zkClient.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, false, true);
+    } catch (KeeperException | InterruptedException e) {
+      LOG.warn("Exception checking ZK path " + ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, e);
+    }
+  }
+
+  /**
+   * Prepare and return internal state of this trigger in a format suitable for persisting in ZK.
+   * @return map of internal state properties. Note: values must be supported by {@link Utils#toJSON(Object)}.
+   */
+  protected abstract Map<String,Object> getState();
+
+  /**
+   * Restore internal state of this trigger from properties retrieved from ZK.
+   * @param state never null but may be empty.
+   */
+  protected abstract void setState(Map<String,Object> state);
+
+  @Override
+  public void saveState() {
+    Map<String,Object> state = Utils.getDeepCopy(getState(), 10, false, true);
+    if (lastState != null && lastState.equals(state)) {
+      // skip saving if identical
+      return;
+    }
+    byte[] data = Utils.toJSON(state);
+    String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName();
+    try {
+      if (zkClient.exists(path, true)) {
+        // update
+        zkClient.setData(path, data, -1, true);
+      } else {
+        // create
+        zkClient.create(path, data, CreateMode.PERSISTENT, true);
+      }
+      lastState = state;
+    } catch (KeeperException | InterruptedException e) {
+      LOG.warn("Exception updating trigger state '" + path + "'", e);
+    }
+  }
+
+  @Override
+  public void restoreState() {
+    byte[] data = null;
+    String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName();
+    try {
+      if (zkClient.exists(path, true)) {
+        data = zkClient.getData(path, null, new Stat(), true);
+      }
+    } catch (KeeperException | InterruptedException e) {
+      LOG.warn("Exception getting trigger state '" + path + "'", e);
+    }
+    if (data != null) {
+      Map<String, Object> restoredState = (Map<String, Object>)Utils.fromJSON(data);
+      // make sure lastState is sorted
+      restoredState = Utils.getDeepCopy(restoredState, 10, false, true);
+      setState(restoredState);
+      lastState = restoredState;
+    }
+  }
+}


Mime
View raw message