lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r1209109 - in /lucene/dev/branches/solrcloud: lucene/src/test-framework/java/org/apache/lucene/store/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/core/ solr/core/src/test/org/apache/solr/cloud/ solr/solrj/sr...
Date Thu, 01 Dec 2011 14:51:26 GMT
Author: markrmiller
Date: Thu Dec  1 14:51:22 2011
New Revision: 1209109

URL: http://svn.apache.org/viewvc?rev=1209109&view=rev
Log:
refactor out recovery into its own class and harden recovery test

Added:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java
  (with props)
Modified:
    lucene/dev/branches/solrcloud/lucene/src/test-framework/java/org/apache/lucene/store/MockDirectoryWrapper.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java

Modified: lucene/dev/branches/solrcloud/lucene/src/test-framework/java/org/apache/lucene/store/MockDirectoryWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/lucene/src/test-framework/java/org/apache/lucene/store/MockDirectoryWrapper.java?rev=1209109&r1=1209108&r2=1209109&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/lucene/src/test-framework/java/org/apache/lucene/store/MockDirectoryWrapper.java
(original)
+++ lucene/dev/branches/solrcloud/lucene/src/test-framework/java/org/apache/lucene/store/MockDirectoryWrapper.java
Thu Dec  1 14:51:22 2011
@@ -104,6 +104,10 @@ public class MockDirectoryWrapper extend
   }
 
   public MockDirectoryWrapper(Random random, Directory delegate) {
+    String checkIndex = System.getProperty("mockdir.checkindex");
+    if (checkIndex != null && checkIndex.equalsIgnoreCase("false")) {
+      this.checkIndexOnClose = false;
+    }
     this.delegate = delegate;
     // must make a private random since our methods are
     // called from different threads; else test failures may

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java?rev=1209109&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java
(added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java
Thu Dec  1 14:51:22 2011
@@ -0,0 +1,156 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.Future;
+
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.UpdateLog.RecoveryInfo;
+import org.apache.solr.util.RefCounted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RecoveryStrat {
+  private static Logger log = LoggerFactory.getLogger(RecoveryStrat.class);
+  
+  private volatile RecoveryListener recoveryListener;
+  
+
+  interface OnFinish {
+    public void run();
+  }
+  
+  // for now, just for tests
+  public interface RecoveryListener {
+    public void startRecovery();
+    public void finishedReplication();
+    public void finishedRecovery();
+  }
+  
+  public void recover(final SolrCore core, final String leaderUrl,
+      final boolean iamLeader, final OnFinish onFinish) {
+    log.info("Start recovery process");
+    if (recoveryListener != null) recoveryListener.startRecovery();
+    core.getUpdateHandler().getUpdateLog().bufferUpdates();
+    Thread thread = new Thread() {
+      {
+        setDaemon(true);
+      }
+      
+      @Override
+      public void run() {
+        try {
+          doRecovery(core, leaderUrl, iamLeader);
+          System.out.println("apply buffered updates");
+          Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog()
+              .applyBufferedUpdates();
+          if (future == null) {
+            // no replay needed\
+            log.info("No replay needed");
+          } else {
+            // wait for replay
+            future.get();
+          }
+          if (recoveryListener != null) recoveryListener.finishedRecovery();
+          onFinish.run();
+        } catch (SolrServerException e) {
+          log.error("", e);
+          // nocommit
+          e.printStackTrace();
+        } catch (IOException e) {
+          log.error("", e);
+          // nocommit
+          e.printStackTrace();
+        } catch (Exception e) {
+          log.error("", e);
+          // nocommit
+          e.printStackTrace();
+        }
+        log.info("Finished recovery process");
+        // nocommit: if we get an exception, recovery failed...
+      }
+    };
+    thread.start();
+    
+  }
+  
+  private void doRecovery(SolrCore core, String leaderUrl, boolean iamleader)
+      throws Exception, SolrServerException, IOException {
+    
+    // start buffer updates to tran log
+    // and do recovery - either replay via realtime get
+    // or full index replication
+    
+    // seems perhaps we cannot do this here since we are not fully running -
+    // we may need to trigger a recovery that happens later
+    
+    if (!iamleader) {
+      // if we are the leader, either we are trying to recover faster
+      // then our ephemeral timed out or we are the only node
+      
+      // TODO: first, issue a hard commit?
+      // nocommit: require /update?
+      
+      CommonsHttpSolrServer server = new CommonsHttpSolrServer(leaderUrl);
+      
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      
+      params.set("commit", true);
+      params.set("qt", "/update");
+      server.query(params);
+      
+      // use rep handler directly, so we can do this sync rather than async
+      
+      ReplicationHandler replicationHandler = (ReplicationHandler) core
+          .getRequestHandler("/replication");
+      
+      if (replicationHandler == null) {
+        log.error("Skipping recovery, no /replication handler found");
+        return;
+      }
+      
+      ModifiableSolrParams solrParams = new ModifiableSolrParams();
+      solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl + "replication");
+      solrParams.set(ReplicationHandler.CMD_FORCE, true);
+      
+      replicationHandler.doFetch(solrParams);
+      
+      RefCounted<SolrIndexSearcher> searcher = core.getSearcher(true, true,
+          null);
+      System.out.println("DOCS AFTER REPLICATE:"
+          + searcher.get().search(new MatchAllDocsQuery(), 1).totalHits);
+      searcher.decref();
+      if (recoveryListener != null) recoveryListener.finishedReplication();
+    }
+  }
+  
+  public RecoveryListener getRecoveryListener() {
+    return recoveryListener;
+  }
+
+  public void setRecoveryListener(RecoveryListener recoveryListener) {
+    this.recoveryListener = recoveryListener;
+  }
+}

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1209109&r1=1209108&r2=1209109&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
Thu Dec  1 14:51:22 2011
@@ -25,13 +25,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.cloud.RecoveryStrat.OnFinish;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.CloudState;
 import org.apache.solr.common.cloud.OnReconnect;
@@ -40,13 +38,10 @@ import org.apache.solr.common.cloud.Solr
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.ReplicationHandler;
-import org.apache.solr.update.UpdateLog.RecoveryInfo;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -105,6 +100,8 @@ public final class ZkController {
 
   private Map<String, CoreAssignment> assignments = new HashMap<String, CoreAssignment>();
 
+  private RecoveryStrat recoveryStrat = new RecoveryStrat();
+
   public static void main(String[] args) throws Exception {
     // start up a tmp zk server first
     SolrZkServer zkServer = new SolrZkServer("true", null, "example/solr", args[2]);
@@ -510,73 +507,45 @@ public final class ZkController {
     String leaderUrl = zkStateReader.getLeader(collection, cloudDesc.getShardId());
     
     final boolean iamleader;
-    final SolrCore core;
-    boolean doRecovery = true;
-    if (leaderUrl.equals(shardUrl)) {
-      iamleader = true;
-      doRecovery = false;
-      core = null;
-      // publish new props
-      publishAsActive(shardUrl, cloudDesc, shardZkNodeName, shardId);
-    } else {
-      iamleader = false;
-      CoreContainer cc = desc.getCoreContainer();
-      if (cc != null) {
-        core = cc.getCore(desc.getName());
-        try {
+    SolrCore core = null;
+    try {
+      boolean doRecovery = true;
+      if (leaderUrl.equals(shardUrl)) {
+        iamleader = true;
+        doRecovery = false;
+        // publish new props
+        publishAsActive(shardUrl, cloudDesc, shardZkNodeName, shardId);
+      } else {
+        iamleader = false;
+        CoreContainer cc = desc.getCoreContainer();
+        if (cc != null) {
+          core = cc.getCore(desc.getName());
+          
           if (core.isReloaded()) {
             doRecovery = false;
           }
-        } finally {
-          core.close();
+          
+        } else {
+          log.warn("Cannot recover without access to CoreContainer");
+          return shardId;
         }
-      } else {
-        log.warn("Cannot recover without access to CoreContainer");
-        return shardId;
+        
       }
+      
+      if (doRecovery) {
+        recoveryStrat.recover(core, leaderUrl, iamleader, new OnFinish() {
 
-    }
-    
-    if (doRecovery) {
-      log.info("Start recovery process");
-      core.getUpdateHandler().getUpdateLog().bufferUpdates();
-      final String frozenShardId = shardId;
-      Thread thread = new Thread() {
-        {
-          setDaemon(true);
-        }
-        @Override
-        public void run() {
-          try {
-            doRecovery(collection, desc, cloudDesc, iamleader);
-            Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
-            if (future == null) {
-              // no replay needed
-            } else {
-              // wait for replay
-              future.get();
-            }
+          @Override
+          public void run() {
             // publish new props
-            publishAsActive(shardUrl, cloudDesc, shardZkNodeName, frozenShardId);
-          } catch (SolrServerException e) {
-            log.error("", e);
-            // nocommit
-            e.printStackTrace();
-          } catch (IOException e) {
-            log.error("", e);
-            // nocommit
-            e.printStackTrace();
-          } catch (Exception e) {
-            log.error("", e);
-            // nocommit
-            e.printStackTrace();
-          }
-          log.info("Finished recovery process");
-          // nocommit: if we get an exception, recovery failed...
-        }
-      };
-      thread.start();
-  
+            publishAsActive(shardUrl, cloudDesc, shardZkNodeName, cloudDesc.getShardId());
+            
+          }});
+      }
+    } finally {
+      if (core != null) {
+        core.close();
+      }
     }
 
     return shardId;
@@ -619,59 +588,6 @@ public final class ZkController {
     return true;
   }
 
-  private void doRecovery(String collection, final CoreDescriptor desc,
-      final CloudDescriptor cloudDesc, boolean iamleader) throws Exception,
-      SolrServerException, IOException {
-
-    // start buffer updates to tran log
-    // and do recovery - either replay via realtime get 
-    // or full index replication
-
-    // seems perhaps we cannot do this here since we are not fully running - 
-    // we may need to trigger a recovery that happens later
-    
-    String leaderUrl = zkStateReader.getLeader(collection, cloudDesc.getShardId());
-    
-    if (!iamleader) {
-      // if we are the leader, either we are trying to recover faster
-      // then our ephemeral timed out or we are the only node
-      
-      // TODO: first, issue a hard commit?
-      // nocommit: require /update?
-      
-      CommonsHttpSolrServer server = new CommonsHttpSolrServer(leaderUrl);
-     
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      
-      params.set("commit", true);
-      params.set("qt", "/update");
-      server.query(params );
-      
-      // if we want to buffer updates while recovering, this
-      // will have to trigger later - http is not yet up ???
-      
-      // use rep handler directly, so we can do this sync rather than async
-      SolrCore core = desc.getCoreContainer().getCore(desc.getName());
-      try {
-        ReplicationHandler replicationHandler = (ReplicationHandler) core
-            .getRequestHandler("/replication");
-        
-        if (replicationHandler == null) {
-          log.error("Skipping recovery, no /replication handler found");
-          return;
-        }
-        
-        ModifiableSolrParams solrParams = new ModifiableSolrParams();
-        solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl + "replication");
-        solrParams.set(ReplicationHandler.CMD_FORCE, true);
-
-        replicationHandler.doFetch(solrParams);
-      } finally {
-        core.close();
-      }
-    }
-  }
-
   private void doLeaderElectionProcess(ElectionContext context) throws KeeperException,
       InterruptedException, IOException {
    
@@ -972,4 +888,9 @@ public final class ZkController {
       this.assignments = newAssignments;
     }
   }
+
+
+  public RecoveryStrat getRecoveryStrat() {
+    return recoveryStrat;
+  }
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1209109&r1=1209108&r2=1209109&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java Thu
Dec  1 14:51:22 2011
@@ -1477,16 +1477,17 @@ public final class SolrCore implements S
     
     handler.handleRequest(req,rsp);
     setResponseHeaderValues(handler,req,rsp);
-    
-    if (log.isInfoEnabled()) {
-      StringBuilder sb = new StringBuilder(logid);
-      for (int i=0; i<toLog.size(); i++) {
-        String name = toLog.getName(i);
-        Object val = toLog.getVal(i);
-        sb.append(name).append("=").append(val).append(" ");
-      }
-      log.info(sb.toString());
-    }
+
+    // nocommit - i commented this out
+//    if (log.isInfoEnabled()) {
+//      StringBuilder sb = new StringBuilder(logid);
+//      for (int i=0; i<toLog.size(); i++) {
+//        String name = toLog.getName(i);
+//        Object val = toLog.getVal(i);
+//        sb.append(name).append("=").append(val).append(" ");
+//      }
+//      log.info(sb.toString());
+//    }
 
   }
 

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1209109&r1=1209108&r2=1209109&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
Thu Dec  1 14:51:22 2011
@@ -37,6 +37,7 @@ import org.apache.solr.client.solrj.impl
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.CloudState;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -103,12 +104,16 @@ public class FullDistributedZkTest exten
     }
     
     public JettySolrRunner killShard(String slice, int index) throws Exception {
-      // kill first shard in shard2
       JettySolrRunner jetty = shardToJetty.get(slice).get(index).jetty;
       jetty.stop();
       return jetty;
     }
     
+    public JettySolrRunner getShard(String slice, int index) throws Exception {
+      JettySolrRunner jetty = shardToJetty.get(slice).get(index).jetty;
+      return jetty;
+    }
+    
     public JettySolrRunner killRandomShard() throws Exception {
       // add all the shards to a list
 //      CloudState clusterState = zk.getCloudState();
@@ -258,13 +263,25 @@ public class FullDistributedZkTest exten
       List<SolrServer> clients) throws Exception,
       IOException, KeeperException, URISyntaxException {
     zkStateReader.updateCloudState(true);
+    
+    while(!zkStateReader.getCloudState().getCollections().contains(DEFAULT_COLLECTION)) {
+      Thread.sleep(500);
+    }
+    while(zkStateReader.getCloudState().getSlices(DEFAULT_COLLECTION).size() != sliceCount)
{
+      Thread.sleep(500);
+    }
+    
     for (SolrServer client : clients) {
       // find info for this client in zk
 
-      
-      Map<String,Slice> slices = zkStateReader.getCloudState().getSlices(
+      CloudState cloudState = zkStateReader.getCloudState();
+      Map<String,Slice> slices = cloudState.getSlices(
           DEFAULT_COLLECTION);
       
+      if (slices == null) {
+        throw new RuntimeException("No slices found for collection " + DEFAULT_COLLECTION
+ " in " + cloudState.getCollections());
+      }
+      
       for (Map.Entry<String,Slice> slice : slices.entrySet()) {
         Map<String,ZkNodeProps> theShards = slice.getValue().getShards();
         for (Map.Entry<String,ZkNodeProps> shard : theShards.entrySet()) {

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java?rev=1209109&r1=1209108&r2=1209109&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
Thu Dec  1 14:51:22 2011
@@ -18,26 +18,35 @@ package org.apache.solr.cloud;
  */
 
 import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.RecoveryStrat.RecoveryListener;
 import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.servlet.SolrDispatchFilter;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 
 /**
  *
  */
-@Ignore("this test is not ready")
+@Ignore
 public class RecoveryZkTest extends FullDistributedZkTest {
+
   
   @BeforeClass
   public static void beforeSuperClass() throws Exception {
-
+    System.setProperty("mockdir.checkindex", "false");
+  }
+  
+  @AfterClass
+  public static void afterSuperClass() throws Exception {
+    System.clearProperty("mockdir.checkindex");
   }
   
   public RecoveryZkTest() {
@@ -48,6 +57,8 @@ public class RecoveryZkTest extends Full
   
   @Override
   public void doTest() throws Exception {
+    // nocommit: remove the need for this
+    Thread.sleep(5000);
     
     handle.clear();
     handle.put("QTime", SKIPVAL);
@@ -55,9 +66,7 @@ public class RecoveryZkTest extends Full
     
     del("*:*");
     
-    //printLayout();
-    
-    // start an indexing thread
+    // start a couple indexing threads
     
     class StopableThread extends Thread {
       private volatile boolean stop = false;
@@ -73,10 +82,8 @@ public class RecoveryZkTest extends Full
           try {
             indexr(id, i++, i1, 50, tlong, 50, t1,
                 "to come to the aid of their country.");
-          } catch (ThreadDeath td) {
-            throw td;
           } catch (Exception e) {
-            //e.printStackTrace();
+            e.printStackTrace();
           }
         }
       }
@@ -92,35 +99,51 @@ public class RecoveryZkTest extends Full
     
     StopableThread indexThread2 = new StopableThread();
     
-    //indexThread2.start();
+    indexThread2.start();
 
     // give some time to index...
-    Thread.sleep(4000);
+    Thread.sleep(4000);   
     
     // bring shard replica down
     System.out.println("bring shard down");
     JettySolrRunner replica = chaosMonkey.killShard("shard1", 1);
-    
-    // wait a moment
-    Thread.sleep(1000);
 
     
+    // wait a moment - lets allow some docs to be indexed so replication time is non 0
+    Thread.sleep(4000);
+
     // bring shard replica up
     replica.start();
     
-    // wait for recovery to complete
-    String shard1State = "";
+    final CountDownLatch recoveryLatch = new CountDownLatch(1);
+    RecoveryStrat recoveryStrat = ((SolrDispatchFilter) replica.getDispatchFilter().getFilter()).getCores()
+        .getZkController().getRecoveryStrat();
+    recoveryStrat.setRecoveryListener(new RecoveryListener() {
+      
+      @Override
+      public void startRecovery() {}
+      
+      @Override
+      public void finishedReplication() {}
+      
+      @Override
+      public void finishedRecovery() {
+        recoveryLatch.countDown();
+      }
+    });
+    
     
-    do  {
-      Thread.sleep(1000);
-      updateMappingsFromZk(jettys, clients);
-      ZkNodeProps props = jettyToInfo.get(replica);
-      shard1State = props.get(ZkStateReader.STATE_PROP);
-    } while(!shard1State.equals(ZkStateReader.ACTIVE));
+    // wait for recovery to finish
+    // if it takes over 30 seconds, assume we didnt get our listener attached before
+    // recover finished
+    recoveryLatch.await(30, TimeUnit.SECONDS);
     
     // stop indexing threads
     indexThread.safeStop();
-    //indexThread2.safeStop();
+    indexThread2.safeStop();
+    
+    indexThread.join();
+    indexThread2.join();
     
     commit();
     
@@ -131,7 +154,7 @@ public class RecoveryZkTest extends Full
     
     assertTrue(client1Docs > 0);
     assertEquals(client1Docs, client2Docs);
-    
+    Thread.sleep(2000);
     // TODO: right now the control and distrib are usually off by a few docs...
     //query("q", "*:*", "distrib", true, "sort", i1 + " desc");
   }

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1209109&r1=1209108&r2=1209109&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
(original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
Thu Dec  1 14:51:22 2011
@@ -124,6 +124,9 @@ public class CloudSolrServer extends Sol
     // TODO: allow multiple collections to be specified via comma separated list
 
     Map<String,Slice> slices = cloudState.getSlices(collection);
+    
+    // nocommit: if slices is null, the collection cannot be found
+    
     Set<String> liveNodes = cloudState.getLiveNodes();
 
     // IDEA: have versions on various things... like a global cloudState version

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1209109&r1=1209108&r2=1209109&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
(original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
Thu Dec  1 14:51:22 2011
@@ -192,6 +192,7 @@ public class ZkStateReader {
                 CloudState clusterState = new CloudState(liveNodesSet, cloudState.getCollectionStates());
                 // update volatile
                 cloudState = clusterState;
+                System.out.println("UPDATE CLUSTER STATE TO:" + cloudState);
               }
             } catch (KeeperException e) {
               if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -215,7 +216,7 @@ public class ZkStateReader {
     liveNodeSet.addAll(liveNodes);
     clusterState = CloudState.load(zkClient, liveNodeSet);
     this.cloudState = clusterState;
-    
+    System.out.println("UPDATE CLUSTER STATE TO:" + cloudState);
   }
   
   



Mime
View raw message