lucene-solr-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r903310 - in /lucene/solr/branches/cloud/src/java/org/apache/solr: cloud/ZkController.java core/CoreContainer.java
Date Tue, 26 Jan 2010 16:32:42 GMT
Author: markrmiller
Date: Tue Jan 26 16:32:42 2010
New Revision: 903310

URL: http://svn.apache.org/viewvc?rev=903310&view=rev
Log:
updates and cleanup

Modified:
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=903310&r1=903309&r2=903310&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Tue Jan 26
16:32:42 2010
@@ -80,9 +80,6 @@
   public static final String ROLE_PROP = "role";
   public static final String NODE_NAME = "node_name";
 
-  // for when we do incremental cloud state updates
-  //final ShardsWatcher shardWatcher = new ShardsWatcher(this);
-
   private SolrZkClient zkClient;
 
   private volatile CloudState cloudState;
@@ -126,12 +123,11 @@
 
           public void command() {
             try {
-              // nocommit : re-register ephemeral nodes, (possibly) wait a while
-              // for others to do the same, then load
+
               createEphemeralNode();
               // register cores in case any new cores came online will zk was down
               
-              // coreContainer may currently be null in tests, so don't reregister
+              // coreContainer may currently be null in tests, so don't re-register
               if(coreContainer != null) {
                 Collection<SolrCore> cores = coreContainer.getCores();
                 for(SolrCore core : cores) {
@@ -179,7 +175,7 @@
         // makes shards zkNode if it doesn't exist
         zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
         
-        // nocommit
+        // nocommit - scrutinize
         // ping that there is a new collection or a new shardId
         zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
       }
@@ -234,15 +230,6 @@
     return cloudState;
   }
 
-  private List<String> getCollectionNames() throws KeeperException,
-      InterruptedException {
-
-    List<String> collectionNodes = zkClient.getChildren(COLLECTIONS_ZKNODE,
-        null);
-
-    return collectionNodes;
-  }
-
   /**
    * Load SolrConfig from ZooKeeper.
    * 
@@ -361,6 +348,9 @@
       
       // makes nodes node
       try {
+        // TODO: for now, no watch - if a node goes down or comes up, its going to change
+        // shards info anyway and cause a state update - this could change if we do incremental
+        // state update
         zkClient.makePath(NODES_ZKNODE);
       } catch (KeeperException e) {
         // its okay if another beats us creating the node
@@ -670,16 +660,16 @@
           "", e);
     }
     
-    log.info("Start watching collections node for changes");
+    log.info("Start watching collections zk node for changes");
     zkClient.getChildren(COLLECTIONS_ZKNODE, new Watcher(){
 
-      public synchronized void process(WatchedEvent event) {
+      public void process(WatchedEvent event) {
           try {
-            // TODO: fine grained - just reload what's changed
-            // nocommit
-            log.info("Notified of collection change");
-            addShardZkNodeWatches();
-            updateCloudState(false);
+            log.info("Detected a new or removed collection");
+            synchronized (ZkController.this) {
+              addShardZkNodeWatches();
+              updateCloudState(false);
+            }
             // re-watch
             zkClient.getChildren(event.getPath(), this);
           } catch (KeeperException e) {
@@ -702,14 +692,16 @@
     
     zkClient.exists(COLLECTIONS_ZKNODE, new Watcher(){
 
-      public synchronized void process(WatchedEvent event) {
+      public void process(WatchedEvent event) {
         if(event.getType() !=  EventType.NodeDataChanged) {
           return;
         }
         log.info("Notified of CloudState change");
         try {
-          addShardZkNodeWatches();
-          updateCloudState(false);
+          synchronized (ZkController.this) {
+            addShardZkNodeWatches();
+            updateCloudState(false);
+          }
           zkClient.exists(COLLECTIONS_ZKNODE, this);
         } catch (KeeperException e) {
           log.error("", e);
@@ -737,30 +729,53 @@
     List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
     for(final String collection : collections) {
       if(!knownCollections.contains(collection)) {
-        zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE, new Watcher(){
-
+        Watcher watcher = new Watcher() {
           public void process(WatchedEvent event) {
-            //nocommit
-            System.out.println("ShardId node added/removed/changed:");
+            log.info("Detected changed ShardId in collection:" + collection);
             try {
               addShardsWatches(collection);
+              updateCloudState(false);
             } catch (KeeperException e) {
               log.error("", e);
-              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                  "", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
             } catch (InterruptedException e) {
               // Restore the interrupted status
               Thread.currentThread().interrupt();
               log.error("", e);
-              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                  "", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (IOException e) {
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
             }
-          }});
+          }
+        };
+        boolean madeWatch = true;
+
+        for (int i = 0; i < 5; i++) {
+          try {
+            zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection
+                + SHARDS_ZKNODE, watcher);
+          } catch (KeeperException.NoNodeException e) {
+            // most likely, the collections node has been created, but not the
+            // shards node yet -- pause and try again
+            madeWatch = false;
+            if(i == 4) {
+              throw e;
+            }
+            Thread.sleep(50);
+          }
+          if(madeWatch) {
+            break;
+          }
+        }
       }
     }
   }
   
-  public void addShardsWatches(String collection) throws KeeperException,
+  public void addShardsWatches(final String collection) throws KeeperException,
       InterruptedException {
     if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE)) {
       List<String> shardIds = zkClient.getChildren(COLLECTIONS_ZKNODE + "/"
@@ -779,9 +794,24 @@
               + SHARDS_ZKNODE + "/" + shardId, new Watcher() {
 
             public void process(WatchedEvent event) {
-              // nocommit
-              System.out.println("shard changed under:" + shardId);
-
+              log.info("Detected a shard change under ShardId:" + shardId + " in collection:"
+ collection);
+              try {
+                updateCloudState(false);
+              } catch (KeeperException e) {
+                log.error("", e);
+                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                    "", e);
+              } catch (InterruptedException e) {
+                // Restore the interrupted status
+                Thread.currentThread().interrupt();
+                log.error("", e);
+                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                    "", e);
+              } catch (IOException e) {
+                log.error("", e);
+                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                    "", e);
+              }
             }
           });
         }

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java?rev=903310&r1=903309&r2=903310&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java Tue Jan 26
16:32:42 2010
@@ -425,8 +425,10 @@
     
     if(zooKeeperController != null) {
       try {
-        zooKeeperController.addShardZkNodeWatches();
-        zooKeeperController.updateCloudState(true);
+        synchronized (zooKeeperController) {
+          zooKeeperController.addShardZkNodeWatches();
+          zooKeeperController.updateCloudState(true);
+        }
       } catch (InterruptedException e) {
         // Restore the interrupted status
         Thread.currentThread().interrupt();



Mime
View raw message