lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [3/3] lucene-solr:jira/solr-10745: SOLR-10745: Fix buggy element removal in triggers. Move marker processing to trigger init(). Clean old markers on Overseer start if there are no triggers to consume them.
Date Thu, 01 Jun 2017 14:04:45 GMT
SOLR-10745: Fix buggy element removal in triggers. Move marker processing to
trigger init(). Clean old markers on Overseer start if there are no triggers to
consume them.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b67de922
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b67de922
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b67de922

Branch: refs/heads/jira/solr-10745
Commit: b67de922e409d8c88b9e82655ed4a3bf0d35d4f7
Parents: c0f3f0d
Author: Andrzej Bialecki <ab@apache.org>
Authored: Thu Jun 1 16:02:13 2017 +0200
Committer: Andrzej Bialecki <ab@apache.org>
Committed: Thu Jun 1 16:02:13 2017 +0200

----------------------------------------------------------------------
 .../org/apache/solr/cloud/ZkController.java     | 12 +--
 .../cloud/autoscaling/AutoScalingConfig.java    | 97 ++++++++++++++++++++
 .../cloud/autoscaling/NodeAddedTrigger.java     | 60 +++++++-----
 .../solr/cloud/autoscaling/NodeLostTrigger.java | 56 ++++++-----
 .../autoscaling/OverseerTriggerThread.java      | 52 ++++++++++-
 .../solr/cloud/autoscaling/TriggerBase.java     |  8 +-
 .../solr/cloud/autoscaling/TestPolicyCloud.java |  1 -
 .../autoscaling/TriggerIntegrationTest.java     | 91 +++++++++++-------
 8 files changed, 285 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index a9004b8..37019ec 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -741,11 +741,9 @@ public class ZkController {
     LiveNodesListener listener = (oldNodes, newNodes) -> {
       oldNodes.removeAll(newNodes);
       if (oldNodes.isEmpty()) { // only added nodes
-        log.debug("-- skip, only new nodes: " + newNodes);
         return;
       }
       if (isClosed) {
-        log.debug("-- skip, closed: old=" + oldNodes + ", new=" + newNodes);
         return;
       }
       // if this node is in the top three then attempt to create nodeLost message
@@ -755,21 +753,17 @@ public class ZkController {
           break;
         }
         if (i > 2) {
-          log.debug("-- skip, " + getNodeName() + " not in the top 3 of " + newNodes);
           return; // this node is not in the top three
         }
         i++;
       }
+
       for (String n : oldNodes) {
         String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n;
         try {
-          // nocommit decide between EPHEMERAL vs. PERSISTENT, the latter needs
-          // explicit cleanup on cluster restart if there are no nodeLost triggers
           zkClient.create(path, null, CreateMode.PERSISTENT, true);
-          log.debug("-- created " + path);
         } catch (KeeperException.NodeExistsException e) {
           // someone else already created this node - ignore
-          log.debug("-- skip, already exists " + path);
         } catch (KeeperException | InterruptedException e1) {
           log.warn("Unable to register nodeLost path for " + n, e1);
         }
@@ -857,8 +851,8 @@ public class ZkController {
     List<Op> ops = new ArrayList<>(2);
     ops.add(Op.create(nodePath, null, zkClient.getZkACLProvider().getACLsToAdd(nodePath),
CreateMode.EPHEMERAL));
     if (!zkClient.exists(nodeAddedPath, true)) {
-      // nocommit use EPHEMERAL or PERSISTENT?
-      // EPHEMERAL will disappear if this node shuts down, PERSISTENT will need an explicit
cleanup
+      // use EPHEMERAL so that it disappears if this node goes down
+      // and no other action is taken
       ops.add(Op.create(nodeAddedPath, null, zkClient.getZkACLProvider().getACLsToAdd(nodeAddedPath),
CreateMode.EPHEMERAL));
     }
     zkClient.multi(ops, true);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
new file mode 100644
index 0000000..2877cb9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
@@ -0,0 +1,97 @@
+package org.apache.solr.cloud.autoscaling;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.util.Utils;
+
+/**
+ * Simple bean representation of <code>autoscaling.json</code>, which parses
data
+ * lazily.
+ */
+public class AutoScalingConfig {
+
+  private final Map<String, Object> jsonMap;
+
+  private Policy policy;
+  private Map<String, TriggerConfig> triggers;
+  private Map<String, ListenerConfig> listeners;
+
+  /**
+   * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.TriggerListener}
config.
+   */
+  public static class ListenerConfig {
+    public String trigger;
+    public List<String> stages;
+    public String listenerClass;
+    public List<Map<String, String>> beforeActions;
+    public List<Map<String, String>> afterActions;
+
+    public ListenerConfig(Map<String, Object> properties) {
+      trigger = (String)properties.get(AutoScalingParams.TRIGGER);
+      stages = (List<String>)properties.getOrDefault(AutoScalingParams.STAGE, Collections.emptyList());
+      listenerClass = (String)properties.get(AutoScalingParams.CLASS);
+      beforeActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.BEFORE_ACTION,
Collections.emptyList());
+      afterActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.AFTER_ACTION,
Collections.emptyList());
+    }
+  }
+
+  /**
+   * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger}
config.
+   */
+  public static class TriggerConfig {
+    public final AutoScaling.EventType eventType;
+    public final Map<String, Object> properties = new HashMap<>();
+
+    public TriggerConfig(Map<String, Object> properties) {
+      String event = (String) properties.get(AutoScalingParams.EVENT);
+      this.eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
+      this.properties.putAll(properties);
+    }
+  }
+
+  public AutoScalingConfig(Map<String, Object> jsonMap) {
+    this.jsonMap = Utils.getDeepCopy(jsonMap, 10);
+  }
+
+  public Policy getPolicy() {
+    if (policy == null) {
+      policy = new Policy(jsonMap);
+    }
+    return policy;
+  }
+
+  public Map<String, TriggerConfig> getTriggerConfigs() {
+    if (triggers == null) {
+      Map<String, Object> trigMap = (Map<String, Object>)jsonMap.get("triggers");
+      if (trigMap == null) {
+        triggers = Collections.emptyMap();
+      } else {
+        triggers = new HashMap<>(trigMap.size());
+        for (Map.Entry<String, Object> entry : trigMap.entrySet()) {
+          triggers.put(entry.getKey(), new TriggerConfig((Map<String, Object>)entry.getValue()));
+        }
+      }
+    }
+    return triggers;
+  }
+
+  public Map<String, ListenerConfig> getListenerConfigs() {
+    if (listeners == null) {
+      Map<String, Object> map = (Map<String, Object>)jsonMap.get("listeners");
+      if (map == null) {
+        listeners = Collections.emptyMap();
+      } else {
+        listeners = new HashMap<>(map.size());
+        for (Map.Entry<String, Object> entry : map.entrySet()) {
+          listeners.put(entry.getKey(), new ListenerConfig((Map<String, Object>)entry.getValue()));
+        }
+      }
+    }
+    return listeners;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index f304ba7..7a46fc7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -24,12 +24,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -62,7 +62,7 @@ public class NodeAddedTrigger extends TriggerBase {
 
   private Set<String> lastLiveNodes;
 
-  private Map<String, Long> nodeNameVsTimeAdded = new ConcurrentHashMap<>();
+  private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
 
   public NodeAddedTrigger(String name, Map<String, Object> properties,
                           CoreContainer container) {
@@ -99,6 +99,20 @@ public class NodeAddedTrigger extends TriggerBase {
         actions.get(i).init(map);
       }
     }
+    // pick up added nodes for which marker paths were created
+    try {
+      List<String> added = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH,
null, true);
+      added.forEach(n -> {
+        log.debug("Adding node from marker path: {}", n);
+        nodeNameVsTimeAdded.put(n, timeSource.getTime());
+        removeNodeAddedMarker(n);
+      });
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Exception retrieving nodeLost markers", e);
+    }
+
   }
 
   @Override
@@ -225,28 +239,13 @@ public class NodeAddedTrigger extends TriggerBase {
       copyOfNew.removeAll(lastLiveNodes);
       copyOfNew.forEach(n -> {
         long eventTime = timeSource.getTime();
-        nodeNameVsTimeAdded.put(n, eventTime);
         log.debug("Tracking new node: {} at time {}", n, eventTime);
+        nodeNameVsTimeAdded.put(n, eventTime);
       });
 
-      // pick up added nodes for which marker paths were created
-      try {
-        List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH,
null, true);
-        lost.forEach(n -> {
-          log.debug("Adding node from marker path: {}", n);
-          nodeNameVsTimeAdded.put(n, timeSource.getTime());
-          try {
-            container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH
+ "/" + n, -1, true);
-          } catch (KeeperException | InterruptedException e) {
-            log.debug("Exception removing nodeAdded marker " + n, e);
-          }
-        });
-      } catch (KeeperException | InterruptedException e) {
-        log.warn("Exception retrieving nodeLost markers", e);
-      }
-
       // has enough time expired to trigger events for a node?
-      for (Map.Entry<String, Long> entry : nodeNameVsTimeAdded.entrySet()) {
+      for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeAdded.entrySet().iterator();
it.hasNext(); ) {
+        Map.Entry<String, Long> entry = it.next();
         String nodeName = entry.getKey();
         Long timeAdded = entry.getValue();
         long now = timeSource.getTime();
@@ -257,20 +256,35 @@ public class NodeAddedTrigger extends TriggerBase {
             log.debug("NodeAddedTrigger {} firing registered listener for node: {} added
at time {} , now: {}", name, nodeName, timeAdded, now);
             if (listener.triggerFired(new NodeAddedEvent(getEventType(), getName(), timeAdded,
nodeName))) {
               // remove from tracking set only if the fire was accepted
-              trackingKeySet.remove(nodeName);
+              it.remove();
+              removeNodeAddedMarker(nodeName);
             }
           } else  {
-            trackingKeySet.remove(nodeName);
+            it.remove();
+            removeNodeAddedMarker(nodeName);
           }
         }
       }
-
       lastLiveNodes = new HashSet(newLiveNodes);
     } catch (RuntimeException e) {
       log.error("Unexpected exception in NodeAddedTrigger", e);
     }
   }
 
+  private void removeNodeAddedMarker(String nodeName) {
+    String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
+    try {
+      if (container.getZkController().getZkClient().exists(path, true)) {
+        container.getZkController().getZkClient().delete(path, -1, true);
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.debug("Exception removing nodeAdded marker " + nodeName, e);
+    }
+
+  }
+
   @Override
   public boolean isClosed() {
     synchronized (this) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 6450bda..2af4cc5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -24,12 +24,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -62,7 +62,7 @@ public class NodeLostTrigger extends TriggerBase {
 
   private Set<String> lastLiveNodes;
 
-  private Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
+  private Map<String, Long> nodeNameVsTimeRemoved = new HashMap<>();
 
   public NodeLostTrigger(String name, Map<String, Object> properties,
                          CoreContainer container) {
@@ -98,6 +98,19 @@ public class NodeLostTrigger extends TriggerBase {
         actions.get(i).init(map);
       }
     }
+    // pick up lost nodes for which marker paths were created
+    try {
+      List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH,
null, true);
+      lost.forEach(n -> {
+        log.debug("Adding lost node from marker path: {}", n);
+        nodeNameVsTimeRemoved.put(n, timeSource.getTime());
+        removeNodeLostMarker(n);
+      });
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Exception retrieving nodeLost markers", e);
+    }
   }
 
   @Override
@@ -227,24 +240,9 @@ public class NodeLostTrigger extends TriggerBase {
         nodeNameVsTimeRemoved.put(n, timeSource.getTime());
       });
 
-      // pick up lost nodes for which marker paths were created
-      try {
-        List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH,
null, true);
-        lost.forEach(n -> {
-          log.debug("Adding lost node from marker path: {}", n);
-          nodeNameVsTimeRemoved.put(n, timeSource.getTime());
-          try {
-            container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH
+ "/" + n, -1, true);
-          } catch (KeeperException | InterruptedException e) {
-            log.warn("Exception removing nodeLost marker " + n, e);
-          }
-        });
-      } catch (KeeperException | InterruptedException e) {
-        log.warn("Exception retrieving nodeLost markers", e);
-      }
-
       // has enough time expired to trigger events for a node?
-      for (Map.Entry<String, Long> entry : nodeNameVsTimeRemoved.entrySet()) {
+      for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeRemoved.entrySet().iterator();
it.hasNext(); ) {
+        Map.Entry<String, Long> entry = it.next();
         String nodeName = entry.getKey();
         Long timeRemoved = entry.getValue();
         if (TimeUnit.SECONDS.convert(timeSource.getTime() - timeRemoved, TimeUnit.NANOSECONDS)
>= getWaitForSecond()) {
@@ -253,20 +251,34 @@ public class NodeLostTrigger extends TriggerBase {
           if (listener != null) {
             log.debug("NodeLostTrigger firing registered listener");
             if (listener.triggerFired(new NodeLostEvent(getEventType(), getName(), timeRemoved,
nodeName)))  {
-              trackingKeySet.remove(nodeName);
+              it.remove();
+              removeNodeLostMarker(nodeName);
             }
           } else  {
-            trackingKeySet.remove(nodeName);
+            it.remove();
+            removeNodeLostMarker(nodeName);
           }
         }
       }
-
       lastLiveNodes = new HashSet<>(newLiveNodes);
     } catch (RuntimeException e) {
       log.error("Unexpected exception in NodeLostTrigger", e);
     }
   }
 
+  private void removeNodeLostMarker(String nodeName) {
+    String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeName;
+    try {
+      if (container.getZkController().getZkClient().exists(path, true)) {
+        container.getZkController().getZkClient().delete(path, -1, true);
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Exception removing nodeLost marker " + nodeName, e);
+    }
+  }
+
   @Override
   public boolean isClosed() {
     synchronized (this) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 4a89ce7..91146b6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -35,6 +35,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.AutoScalingParams;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -169,10 +170,59 @@ public class OverseerTriggerThread implements Runnable, Closeable {
           scheduledTriggers.remove(managedTriggerName);
         }
       }
+      // check for nodeLost triggers in the current config, and if
+      // absent then clean up old nodeLost / nodeAdded markers
+      boolean cleanOldNodeLostMarkers = true;
+      boolean cleanOldNodeAddedMarkers = true;
       // add new triggers and/or replace and close the replaced triggers
       for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
+        if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODELOST)) {
+          cleanOldNodeLostMarkers = false;
+        }
+        if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODEADDED)) {
+          cleanOldNodeAddedMarkers = false;
+        }
         scheduledTriggers.add(entry.getValue());
       }
+      if (cleanOldNodeLostMarkers) {
+        log.debug("-- clean old nodeLost markers");
+        try {
+          List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH,
null, true);
+          markers.forEach(n -> {
+            removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, n);
+          });
+        } catch (KeeperException.NoNodeException e) {
+          // ignore
+        } catch (KeeperException | InterruptedException e) {
+          log.warn("Error removing old nodeLost markers", e);
+        }
+      }
+      if (cleanOldNodeAddedMarkers) {
+        log.debug("-- clean old nodeAdded markers");
+        try {
+          List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH,
null, true);
+          markers.forEach(n -> {
+            removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, n);
+          });
+        } catch (KeeperException.NoNodeException e) {
+          // ignore
+        } catch (KeeperException | InterruptedException e) {
+          log.warn("Error removing old nodeAdded markers", e);
+        }
+
+      }
+    }
+  }
+
+  private void removeNodeMarker(String path, String nodeName) {
+    path = path + "/" + nodeName;
+    try {
+      zkClient.delete(path, -1, true);
+      log.debug("  -- deleted " + path);
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Error removing old marker " + path, e);
     }
   }
 
@@ -250,7 +300,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
 
     for (Map.Entry<String, Object> entry : triggers.entrySet()) {
       Map<String, Object> props = (Map<String, Object>) entry.getValue();
-      String event = (String) props.get("event");
+      String event = (String) props.get(AutoScalingParams.EVENT);
       AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
       String triggerName = entry.getKey();
       triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, props));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
index ef9a3cf..7aff846 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
@@ -97,11 +97,11 @@ public abstract class TriggerBase implements AutoScaling.Trigger {
       LOG.warn("Exception getting trigger state '" + path + "'", e);
     }
     if (data != null) {
-      Map<String, Object> state = (Map<String, Object>)Utils.fromJSON(data);
+      Map<String, Object> restoredState = (Map<String, Object>)Utils.fromJSON(data);
       // make sure lastState is sorted
-      state = Utils.getDeepCopy(state, 10, false, true);;
-      setState(state);
-      lastState = state;
+      restoredState = Utils.getDeepCopy(restoredState, 10, false, true);
+      setState(restoredState);
+      lastState = restoredState;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
index 5038278..a809873 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
@@ -30,7 +30,6 @@ import org.apache.solr.cloud.OverseerTaskProcessor;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index b2c95b7..ae3f72d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -139,6 +139,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     paths.forEach(n -> {
       try {
         ZKUtil.deleteRecursive(zkClient().getSolrZooKeeper(), path + "/" + n);
+      } catch (KeeperException.NoNodeException e) {
+        // ignore
       } catch (KeeperException | InterruptedException e) {
         log.warn("Error deleting old data", e);
       }
@@ -634,10 +636,11 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
     @Override
     public void process(TriggerEvent event) {
+      log.info("-- event: " + event);
       events.add(event);
       getActionStarted().countDown();
       try {
-        Thread.sleep(5000);
+        Thread.sleep(eventQueueActionWait);
         triggerFired.compareAndSet(false, true);
         getActionCompleted().countDown();
       } catch (InterruptedException e) {
@@ -658,6 +661,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     }
   }
 
+  public static long eventQueueActionWait = 5000;
+
   @Test
   public void testEventQueue() throws Exception {
     CloudSolrClient solrClient = cluster.getSolrClient();
@@ -696,19 +701,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     assertNotNull(nodeAddedEvent);
     // but action did not complete yet so the event is still enqueued
     assertFalse(triggerFired.get());
+    events.clear();
     actionStarted = new CountDownLatch(1);
+    eventQueueActionWait = 1;
     // kill overseer leader
     cluster.stopJettySolrRunner(overseerLeaderIndex);
     Thread.sleep(5000);
+    // new overseer leader should be elected and run triggers
     await = actionInterrupted.await(3, TimeUnit.SECONDS);
     assertTrue("action wasn't interrupted", await);
-    // new overseer leader should be elected and run triggers
-    newNode = cluster.startJettySolrRunner();
-    // it should fire again but not complete yet
+    // it should fire again from enqueued event
     await = actionStarted.await(60, TimeUnit.SECONDS);
     TriggerEvent replayedEvent = events.iterator().next();
     assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
-    assertTrue(replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
+    assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME)
!= null);
     await = actionCompleted.await(10, TimeUnit.SECONDS);
     assertTrue(triggerFired.get());
   }
@@ -743,6 +749,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
       }
     }
 
+    events.clear();
+
     JettySolrRunner newNode = cluster.startJettySolrRunner();
     boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
     assertTrue("The trigger did not fire at all", await);
@@ -797,8 +805,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
   }
 
   public static class TestEventMarkerAction implements TriggerAction {
-    // sanity check that an action instance is only invoked once
-    private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
 
     public TestEventMarkerAction() {
       actionConstructorCalled.countDown();
@@ -806,7 +812,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
     @Override
     public String getName() {
-      return "TestTriggerAction";
+      return "TestEventMarkerAction";
     }
 
     @Override
@@ -841,17 +847,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
     @Override
     public void init(Map<String, String> args) {
-      log.info("TestTriggerAction init");
+      log.info("TestEventMarkerAction init");
       actionInitCalled.countDown();
     }
   }
 
   @Test
-  public void testNodeEventsRegistration() throws Exception {
+  public void testNodeMarkersRegistration() throws Exception {
     // for this test we want to create two triggers so we must assert that the actions were
created twice
     actionInitCalled = new CountDownLatch(2);
     // similarly we want both triggers to fire
-    triggerFiredLatch = new CountDownLatch(3);
+    triggerFiredLatch = new CountDownLatch(2);
     TestLiveNodesListener listener = registerLiveNodesListener();
 
     NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
@@ -875,7 +881,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
     assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
     listener.reset();
-    // stop overseer, which should also cause nodeLost event
+    // stop overseer
+    log.info("====== KILL OVERSEER 1");
     cluster.stopJettySolrRunner(overseerLeaderIndex);
     if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
       fail("onChange listener didn't execute on cluster change");
@@ -883,12 +890,16 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     assertEquals(1, listener.lostNodes.size());
     assertEquals(overseerLeader, listener.lostNodes.iterator().next());
     assertEquals(0, listener.addedNodes.size());
-    // verify that a znode exists
+    // wait until the new overseer is up
+    Thread.sleep(5000);
+    // verify that a znode does NOT exist - there's no nodeLost trigger,
+    // so the new overseer cleaned up existing nodeLost markers
     String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
-    assertTrue("Path " + pathLost + " wasn't created", zkClient().exists(pathLost, true));
+    assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
 
     listener.reset();
     // create another node
+    log.info("====== ADD NODE 1");
     JettySolrRunner node1 = cluster.startJettySolrRunner();
     if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
       fail("onChange listener didn't execute on cluster change");
@@ -902,6 +913,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     // set up triggers
     CloudSolrClient solrClient = cluster.getSolrClient();
 
+    log.info("====== ADD TRIGGERS");
     String setTriggerCommand = "{" +
         "'set-trigger' : {" +
         "'name' : 'node_added_trigger'," +
@@ -926,25 +938,40 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
 
-    if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
-      fail("Both triggers should have fired by now");
-    }
-    assertEquals(3, events.size());
-    // 2 nodeAdded, 1 nodeLost
-    int nodeAdded = 0;
-    int nodeLost = 0;
-    for (TriggerEvent ev : events) {
-      String nodeName = (String)ev.getProperty(TriggerEvent.NODE_NAME);
-      if (ev.eventType.equals(AutoScaling.EventType.NODELOST)) {
-        assertEquals(overseerLeader, nodeName);
-        nodeLost++;
-      } else if (ev.eventType.equals(AutoScaling.EventType.NODEADDED)) {
-        assertTrue(nodeName + " not one of: " + node.getNodeName() + ", " + node1.getNodeName(),
-            nodeName.equals(node.getNodeName()) || nodeName.equals(node1.getNodeName()));
-        nodeAdded++;
+    overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+    overseerLeader = (String) overSeerStatus.get("leader");
+    overseerLeaderIndex = 0;
+    for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+      JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+      if (jetty.getNodeName().equals(overseerLeader)) {
+        overseerLeaderIndex = i;
+        break;
       }
     }
-    assertEquals(1, nodeLost);
-    assertEquals(2, nodeAdded);
+
+    Thread.sleep(5000);
+    // old nodeAdded markers should be consumed now by nodeAdded trigger
+    // but it doesn't result in new events because all nodes have been added
+    // before we configured the trigger
+    assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded,
true));
+
+    listener.reset();
+    events.clear();
+    triggerFiredLatch = new CountDownLatch(1);
+    // kill overseer again
+    log.info("====== KILL OVERSEER 2");
+    cluster.stopJettySolrRunner(overseerLeaderIndex);
+    if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+      fail("onChange listener didn't execute on cluster change");
+    }
+
+
+    if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
+      fail("Trigger should have fired by now");
+    }
+    assertEquals(1, events.size());
+    TriggerEvent ev = events.iterator().next();
+    assertEquals(overseerLeader, ev.getProperty(TriggerEvent.NODE_NAME));
+    assertEquals(AutoScaling.EventType.NODELOST, ev.getEventType());
   }
 }


Mime
View raw message