lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dragonsi...@apache.org
Subject lucene-solr:branch_5x: SOLR-8694: DistributedMap/Queue can create too many Watchers and some code simplification.
Date Thu, 21 Apr 2016 00:16:33 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/branch_5x 9c7a031f9 -> bbae36aa9


SOLR-8694: DistributedMap/Queue can create too many Watchers and some code simplification.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/bbae36aa
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/bbae36aa
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/bbae36aa

Branch: refs/heads/branch_5x
Commit: bbae36aa92c58cdbe031ea447bcbd9ae66f5138c
Parents: 9c7a031
Author: markrmiller <markrmiller@apache.org>
Authored: Fri Feb 19 14:33:50 2016 -0500
Committer: Scott Blum <dragonsinth@apache.org>
Committed: Wed Apr 20 19:25:36 2016 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../org/apache/solr/cloud/DistributedMap.java   | 152 +------------------
 .../org/apache/solr/cloud/DistributedQueue.java |   6 +-
 .../java/org/apache/solr/cloud/Overseer.java    |   6 +-
 .../solr/cloud/SizeLimitedDistributedMap.java   |  29 ++--
 .../solr/handler/admin/CollectionsHandler.java  |   8 +-
 6 files changed, 34 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbae36aa/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3f2c2d3..313c608 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -83,6 +83,9 @@ Bug Fixes
 
 * SOLR-8973: Zookeeper frenzy when a core is first created. (Janmejay Singh, Scott Blum,
shalin)
 
+* SOLR-8694: DistributedMap/Queue can create too many Watchers and some code simplification.
+  (Scott Blum via Mark Miller)
+
 ======================= 5.5.0 =======================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbae36aa/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
index 8434eb8..c3b5690 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
@@ -22,15 +22,9 @@ import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.lang.invoke.MethodHandles;
 import java.util.List;
 
 /**
@@ -39,19 +33,13 @@ import java.util.List;
  * don't have to be ordered i.e. DistributedQueue.
  */
 public class DistributedMap {
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  protected static long DEFAULT_TIMEOUT = 5*60*1000;
-
   protected final String dir;
 
   protected SolrZkClient zookeeper;
 
   protected final String prefix = "mn-";
 
-  protected final String response_prefix = "mnr-" ;
-
-  public DistributedMap(SolrZkClient zookeeper, String dir, List<ACL> acl) {
+  public DistributedMap(SolrZkClient zookeeper, String dir) {
     this.dir = dir;
 
     ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
@@ -67,89 +55,13 @@ public class DistributedMap {
     this.zookeeper = zookeeper;
   }
 
-  protected class LatchChildWatcher implements Watcher {
-
-    Object lock = new Object();
-    private WatchedEvent event = null;
-
-    public LatchChildWatcher() {}
-
-    public LatchChildWatcher(Object lock) {
-      this.lock = lock;
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-      LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: "
-          + event.getState() + " type " + event.getType());
-      synchronized (lock) {
-        this.event = event;
-        lock.notifyAll();
-      }
-    }
-
-    public void await(long timeout) throws InterruptedException {
-      synchronized (lock) {
-        lock.wait(timeout);
-      }
-    }
-
-    public WatchedEvent getWatchedEvent() {
-      return event;
-    }
-  }
-
-  /**
-   * Inserts data into zookeeper.
-   *
-   * @return true if data was successfully added
-   */
-  protected String createData(String path, byte[] data, CreateMode mode)
-      throws KeeperException, InterruptedException {
-      for (;;) {
-      try {
-        return zookeeper.create(path, data, mode, true);
-      } catch (KeeperException.NoNodeException e) {
-        try {
-          zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
-        } catch (KeeperException.NodeExistsException ne) {
-          // someone created it
-        }
-      }
-    }
-  }
-
-
-  public boolean put(String trackingId, byte[] data) throws KeeperException, InterruptedException
{
-    return createData(dir + "/" + prefix + trackingId, data,
-        CreateMode.PERSISTENT) != null;
-  }
 
-  /**
-   * Offer the data and wait for the response
-   *
-   */
-  public MapEvent put(String trackingId, byte[] data, long timeout) throws KeeperException,
-      InterruptedException {
-    String path = createData(dir + "/" + prefix + trackingId, data,
-        CreateMode.PERSISTENT);
-    String watchID = createData(
-        dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
-        null, CreateMode.EPHEMERAL);
-    Object lock = new Object();
-    LatchChildWatcher watcher = new LatchChildWatcher(lock);
-    synchronized (lock) {
-      if (zookeeper.exists(watchID, watcher, true) != null) {
-        watcher.await(timeout);
-      }
-    }
-    byte[] bytes = zookeeper.getData(watchID, null, null, true);
-    zookeeper.delete(watchID, -1, true);
-    return new MapEvent(watchID, bytes, watcher.getWatchedEvent());
+  public void put(String trackingId, byte[] data) throws KeeperException, InterruptedException
{
+    zookeeper.makePath(dir + "/" + prefix + trackingId, data, CreateMode.PERSISTENT, null,
false, true);
   }
 
-  public MapEvent get(String trackingId) throws KeeperException, InterruptedException {
-    return new MapEvent(trackingId, zookeeper.getData(dir + "/" + prefix + trackingId, null,
null, true), null);
+  public byte[] get(String trackingId) throws KeeperException, InterruptedException {
+    return zookeeper.getData(dir + "/" + prefix + trackingId, null, null, true);
   }
 
   public boolean contains(String trackingId) throws KeeperException, InterruptedException
{
@@ -187,58 +99,4 @@ public class DistributedMap {
 
   }
 
-  public static class MapEvent {
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + ((id == null) ? 0 : id.hashCode());
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) return true;
-      if (obj == null) return false;
-      if (getClass() != obj.getClass()) return false;
-      MapEvent other = (MapEvent) obj;
-      if (id == null) {
-        if (other.id != null) return false;
-      } else if (!id.equals(other.id)) return false;
-      return true;
-    }
-
-    private WatchedEvent event = null;
-    private String id;
-    private byte[] bytes;
-
-    MapEvent(String id, byte[] bytes, WatchedEvent event) {
-      this.id = id;
-      this.bytes = bytes;
-      this.event = event;
-    }
-
-    public void setId(String id) {
-      this.id = id;
-    }
-
-    public String getId() {
-      return id;
-    }
-
-    public void setBytes(byte[] bytes) {
-      this.bytes = bytes;
-    }
-
-    public byte[] getBytes() {
-      return bytes;
-    }
-
-    public WatchedEvent getWatchedEvent() {
-      return event;
-    }
-
-  }
-
-  
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbae36aa/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
index 019ffdf..d3bf2e4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
@@ -319,7 +319,7 @@ public class DistributedQueue {
         }
         return orderedChildren;
       } catch (KeeperException.NoNodeException e) {
-        zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
+        zookeeper.makePath(dir, false, true);
         // go back to the loop and try again
       }
     }
@@ -409,6 +409,10 @@ public class DistributedQueue {
 
     @Override
     public void process(WatchedEvent event) {
+      // session events are not change events, and do not remove the watcher; except for
Expired
+      if (Event.EventType.None.equals(event.getType()) && !Event.KeeperState.Expired.equals(event.getState()))
{
+        return;
+      }
       updateLock.lock();
       try {
         // this watcher is automatically cleared when fired

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbae36aa/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index a8c3ab5..b8098d6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -912,19 +912,19 @@ public class Overseer implements Closeable {
   /* Internal map for failed tasks, not to be used outside of the Overseer */
   static DistributedMap getRunningMap(final SolrZkClient zkClient) {
     createOverseerNode(zkClient);
-    return new DistributedMap(zkClient, "/overseer/collection-map-running", null);
+    return new DistributedMap(zkClient, "/overseer/collection-map-running");
   }
 
   /* Size-limited map for successfully completed tasks*/
   static DistributedMap getCompletedMap(final SolrZkClient zkClient) {
     createOverseerNode(zkClient);
-    return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed",
null, NUM_RESPONSES_TO_STORE);
+    return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed",
NUM_RESPONSES_TO_STORE);
   }
 
   /* Map for failed tasks, not to be used outside of the Overseer */
   static DistributedMap getFailureMap(final SolrZkClient zkClient) {
     createOverseerNode(zkClient);
-    return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", null,
NUM_RESPONSES_TO_STORE);
+    return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", NUM_RESPONSES_TO_STORE);
   }
   
   /* Collection creation queue */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbae36aa/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
index 418eb66..3326dca 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
@@ -33,41 +33,40 @@ public class SizeLimitedDistributedMap extends DistributedMap {
 
   private final int maxSize;
 
-  public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, List<ACL> acl,
int maxSize) {
-    super(zookeeper, dir, acl);
+  public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, int maxSize) {
+    super(zookeeper, dir);
     this.maxSize = maxSize;
   }
-  
+
   @Override
-  public boolean put(String trackingId, byte[] data) throws KeeperException, InterruptedException
{
-    if(this.size() >= maxSize) {
+  public void put(String trackingId, byte[] data) throws KeeperException, InterruptedException
{
+    if (this.size() >= maxSize) {
       // Bring down the size
       List<String> children = zookeeper.getChildren(dir, null, true);
 
       int cleanupSize = maxSize / 10;
-      
+
       final PriorityQueue priorityQueue = new PriorityQueue<Long>(cleanupSize) {
         @Override
         protected boolean lessThan(Long a, Long b) {
           return (a > b);
         }
       };
-      
-      for(String child: children) {
+
+      for (String child : children) {
         Stat stat = zookeeper.exists(dir + "/" + child, null, true);
         priorityQueue.insertWithOverflow(stat.getMzxid());
       }
-      
+
       long topElementMzxId = (Long) priorityQueue.top();
-      
-      for(String child:children) {
+
+      for (String child : children) {
         Stat stat = zookeeper.exists(dir + "/" + child, null, true);
-        if(stat.getMzxid() <= topElementMzxId)
+        if (stat.getMzxid() <= topElementMzxId)
           zookeeper.delete(dir + "/" + child, -1, true);
       }
     }
-      
-    return createData(dir + "/" + prefix + trackingId, data,
-        CreateMode.PERSISTENT) != null;
+
+    super.put(trackingId, data);
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbae36aa/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 7b9c1f2..04bc444 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -575,12 +575,12 @@ public class CollectionsHandler extends RequestHandlerBase {
 
         final NamedList<Object> results = new NamedList<>();
         if (zkController.getOverseerCompletedMap().contains(requestId)) {
-          final DistributedMap.MapEvent mapEvent = zkController.getOverseerCompletedMap().get(requestId);
-          rsp.getValues().addAll(SolrResponse.deserialize(mapEvent.getBytes()).getResponse());
+          final byte[] mapEntry = zkController.getOverseerCompletedMap().get(requestId);
+          rsp.getValues().addAll(SolrResponse.deserialize(mapEntry).getResponse());
           addStatusToResponse(results, COMPLETED, "found [" + requestId + "] in completed
tasks");
         } else if (zkController.getOverseerFailureMap().contains(requestId)) {
-          final DistributedMap.MapEvent mapEvent = zkController.getOverseerFailureMap().get(requestId);
-          rsp.getValues().addAll(SolrResponse.deserialize(mapEvent.getBytes()).getResponse());
+          final byte[] mapEntry = zkController.getOverseerFailureMap().get(requestId);
+          rsp.getValues().addAll(SolrResponse.deserialize(mapEntry).getResponse());
           addStatusToResponse(results, FAILED, "found [" + requestId + "] in failed tasks");
         } else if (zkController.getOverseerRunningMap().contains(requestId)) {
           addStatusToResponse(results, RUNNING, "found [" + requestId + "] in running tasks");


Mime
View raw message