lucene-solr-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r895450 - in /lucene/solr/branches/cloud/src: java/org/apache/solr/cloud/ test/org/apache/solr/cloud/
Date Sun, 03 Jan 2010 17:25:05 GMT
Author: markrmiller
Date: Sun Jan  3 17:25:03 2010
New Revision: 895450

URL: http://svn.apache.org/viewvc?rev=895450&view=rev
Log:
cleanup + the start of some reconnection test code

Modified:
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java?rev=895450&r1=895449&r2=895450&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java Sun Jan
 3 17:25:03 2010
@@ -1,5 +1,22 @@
 package org.apache.solr.cloud;
 
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
@@ -45,7 +62,7 @@
 
   public synchronized void process(WatchedEvent event) {
     if (log.isInfoEnabled()) {
-      log.info("Watcher " + name + " got event " + event);
+      log.info("Watcher " + this + " name:" + name + " got event " + event);
     }
 
     state = event.getState();
@@ -62,11 +79,13 @@
       try {
         connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate()
{
           @Override
-          public void update(ZooKeeper keeper) {
+          public void update(ZooKeeper keeper) throws InterruptedException, TimeoutException,
IOException {
+           waitForConnected(SolrZkClient.CONNECT_TIMEOUT);
            client.updateKeeper(keeper);
+           ConnectionManager.this.connected = true;
           }
         });
-      } catch (IOException e) {
+      } catch (Exception e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
       }
@@ -76,6 +95,21 @@
     } else if (state == KeeperState.Disconnected) {
       connected = false;
       // nocommit: start reconnect attempts
+      
+      try {
+        connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate()
{
+          @Override
+          public void update(ZooKeeper keeper) throws InterruptedException, TimeoutException,
IOException {
+           waitForConnected(SolrZkClient.CONNECT_TIMEOUT);
+           client.updateKeeper(keeper);
+           ConnectionManager.this.connected = true;
+          }
+        });
+      } catch (Exception e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      
     } else {
       connected = false;
     }

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java?rev=895450&r1=895449&r2=895450&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
(original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
Sun Jan  3 17:25:03 2010
@@ -18,6 +18,7 @@
  */
 
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
@@ -28,12 +29,12 @@
 public class DefaultConnectionStrategy extends ZkClientConnectionStrategy {
 
   @Override
-  public void connect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater)
throws IOException {
+  public void connect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater)
throws IOException, InterruptedException, TimeoutException {
     updater.update(new ZooKeeper(serverAddress, timeout, watcher));
   }
 
   @Override
-  public void reconnect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater)
throws IOException {
+  public void reconnect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater)
throws IOException, InterruptedException, TimeoutException {
     updater.update(new ZooKeeper(serverAddress, timeout, watcher));
   }
 

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java?rev=895450&r1=895449&r2=895450&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java Sun Jan  3
17:25:03 2010
@@ -46,7 +46,7 @@
 public class SolrZkClient {
   static final String NEWL = System.getProperty("line.separator");
 
-  private static final int CONNECT_TIMEOUT = 5000;
+  static final int CONNECT_TIMEOUT = 5000;
 
   protected static final Logger log = LoggerFactory
       .getLogger(SolrZkClient.class);
@@ -79,11 +79,18 @@
   public SolrZkClient(String zkServerAddress, int zkClientTimeout,
       ZkClientConnectionStrategy strat) throws InterruptedException,
       TimeoutException, IOException {
-    connManager = new ConnectionManager("ZooKeeperConnection Watcher", this,
+    connManager = new ConnectionManager("ZooKeeperConnection Watcher:" + zkServerAddress,
this,
         zkServerAddress, zkClientTimeout, strat);
     strat.connect(zkServerAddress, zkClientTimeout, connManager, new ZkUpdate() {
       @Override
       public void update(ZooKeeper zooKeeper) {
+        if(keeper != null) {
+          try {
+            keeper.close();
+          } catch (InterruptedException e) {
+            // nocommit
+          }
+        }
         keeper = zooKeeper;
       }
     });
@@ -109,11 +116,21 @@
   }
 
   /**
-   * @param path
-   * @param watcher
-   * @return
-   * @throws KeeperException
-   * @throws InterruptedException
+   * Return the stat of the node of the given path. Return null if no such a
+   * node exists.
+   * <p>
+   * If the watch is non-null and the call is successful (no exception is thrown),
+   * a watch will be left on the node with the given path. The watch will be
+   * triggered by a successful operation that creates/delete the node or sets
+   * the data on the node.
+   *
+   * @param path the node path
+   * @param watcher explicit watcher
+   * @return the stat of the node of the given path; return null if no such a
+   *         node exists.
+   * @throws KeeperException If the server signals an error
+   * @throws InterruptedException If the server transaction is interrupted.
+   * @throws IllegalArgumentException if an invalid path is specified
    */
   public Stat exists(final String path, Watcher watcher)
       throws KeeperException, InterruptedException {
@@ -249,9 +266,9 @@
   public void makePath(String path, byte[] data, CreateMode createMode,
       Watcher watcher) throws KeeperException, InterruptedException {
     if (log.isInfoEnabled()) {
-      log.info("makePath: " + path);
+      log.info("makePath: " + path + " keeper:" + keeper);
     }
-
+    
     if (path.startsWith("/")) {
       path = path.substring(1, path.length());
     }
@@ -390,6 +407,8 @@
    * @throws InterruptedException
    */
   public void close() throws InterruptedException {
+    // nocommit
+    log.info("closing SolrZKClient " + this);
     keeper.close();
   }
 
@@ -399,6 +418,8 @@
    * @param keeper
    */
   void updateKeeper(ZooKeeper keeper) {
+    // nocommit
+   log.info("Updating ZooKeeper instance");
    this.keeper = keeper;
   }
 

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java?rev=895450&r1=895449&r2=895450&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java
(original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java
Sun Jan  3 17:25:03 2010
@@ -18,6 +18,7 @@
  */
 
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
@@ -26,11 +27,11 @@
  *
  */
 public abstract class ZkClientConnectionStrategy {
-  public abstract void connect(String zkServerAddress, int zkClientTimeout, Watcher watcher,
ZkUpdate updater) throws IOException;
-  public abstract void reconnect(String serverAddress, int zkClientTimeout, Watcher watcher,
ZkUpdate updater) throws IOException;
+  public abstract void connect(String zkServerAddress, int zkClientTimeout, Watcher watcher,
ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
+  public abstract void reconnect(String serverAddress, int zkClientTimeout, Watcher watcher,
ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
   
   public static abstract class ZkUpdate {
-    public abstract void update(ZooKeeper zooKeeper);
+    public abstract void update(ZooKeeper zooKeeper) throws InterruptedException, TimeoutException,
IOException;
   }
   
 }

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=895450&r1=895449&r2=895450&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Sun Jan  3
17:25:03 2010
@@ -89,7 +89,7 @@
 
       try {
         // refresh watcher
-        controller.getKeeperConnection().exists(event.getPath(), this);
+        //controller.getKeeperConnection().exists(event.getPath(), this);
 
         // TODO: need to load whole state?
         controller.loadCollectionInfo();
@@ -237,6 +237,25 @@
       // Restore the interrupted status
       Thread.currentThread().interrupt();
     }
+    
+    // no watch the shards node
+    try {
+      zkClient.exists(shardsZkPath, new Watcher(){
+
+        public void process(WatchedEvent event) {
+          // nocommit
+          // the shards node has been updated
+          // we need to look for new nodes
+          
+        }});
+    } catch (KeeperException e) {
+      log.error("ZooKeeper Exception", e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "ZooKeeper Exception", e);
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+    }
   }
 
   /**
@@ -371,8 +390,10 @@
           + " but core's ZooKeeper node has already been removed");
     } catch (KeeperException e) {
       log.error("ZooKeeper Exception", e);
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "ZooKeeper Exception", e);
+      // we can't get through to ZooKeeper, so log error
+      // and allow close process to continue -
+      // if ZooKeeper is down, our ephemeral node
+      // should be removed anyway
     }
   }
 

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java?rev=895450&r1=895449&r2=895450&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java Sun
Jan  3 17:25:03 2010
@@ -125,8 +125,8 @@
 
   public void tearDown() throws Exception {
     printLayout();
-    zkServer.shutdown();
     super.tearDown();
+    zkServer.shutdown();
   }
 
   private void printLayout() throws Exception {

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java?rev=895450&r1=895449&r2=895450&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java Sun Jan
 3 17:25:03 2010
@@ -1,9 +1,35 @@
 package org.apache.solr.cloud;
 
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
 import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import junit.framework.TestCase;
 
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
 public class ZkSolrClientTest extends TestCase {
   protected File tmpDir = new File(System.getProperty("java.io.tmpdir")
       + System.getProperty("file.separator") + getClass().getName() + "-"
@@ -21,17 +47,75 @@
       AbstractZkTestCase.makeSolrZkNode();
 
       zkClient = new SolrZkClient(AbstractZkTestCase.ZOO_KEEPER_ADDRESS,
-          AbstractZkTestCase.TIMEOUT);
+          AbstractZkTestCase.TIMEOUT, new ZkClientConnectionStrategy() {
+            ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+            @Override
+            public void reconnect(final String serverAddress, final int zkClientTimeout,
+                final Watcher watcher, final ZkUpdate updater) throws IOException {
+              System.out.println("reconnecting");
+              executor.scheduleAtFixedRate(new Runnable() {
+                public void run() {
+                  // nocommit
+                  System.out.println("Attempting the connect...");
+                  try {
+                    updater.update(new ZooKeeper(serverAddress, zkClientTimeout, watcher));
+                    // nocommit
+                    System.out.println("Connect done");
+                  } catch (Exception e) {
+                    // nocommit
+                    e.printStackTrace();
+                    System.out.println("failed reconnect");
+                  }
+                  executor.shutdownNow();
+                  
+                }
+              }, 0, 400, TimeUnit.MILLISECONDS);
+              
+            }
+            
+            @Override
+            public void connect(String zkServerAddress, int zkClientTimeout,
+                Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException,
TimeoutException {
+              System.out.println("connecting");
+              updater.update(new ZooKeeper(zkServerAddress, zkClientTimeout, watcher));
+              
+            }
+          });
       String shardsPath = "/collections/collection1/shards";
       zkClient.makePath(shardsPath);
 
       zkClient.makePath("collections/collection1/config=collection1");
       
-      zkClient.dissconect();
+      server.shutdown();
+      
+      Thread.sleep(80);
+      
+      boolean exceptionHappened = false;
+      try {
+        zkClient.makePath("collections/collection1/config=collection2");
+      } catch (KeeperException.ConnectionLossException e) {
+        // nocommit : the connection should be down
+        exceptionHappened = true;
+      }
+      
+      assertTrue("Server should be down here", exceptionHappened);
+      
+      server = new ZkTestServer(zkDir);
+      server.run();
+      
+      Thread.sleep(80);
+      zkClient.makePath("collections/collection1/config=collection3");
+      
+      zkClient.printLayoutToStdOut();
       
-      zkClient.makePath("collections/collection1/config=collection2");
+      assertNotNull(zkClient.exists("/collections/collection1/config=collection3", null));
+      assertNotNull(zkClient.exists("/collections/collection1/config=collection1", null));
 
+    } catch(Exception e) {
+      // nocommit
+      e.printStackTrace();
     } finally {
+    
       if (zkClient != null) {
         zkClient.close();
       }



Mime
View raw message