lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r1245836 - in /lucene/dev/trunk/solr: core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/core/ core/src/java/org/apache/solr/handler/admin/ core/src/java/org/apache/solr/update/processor/ core/src/test/org/apache/solr/cloud/...
Date Sat, 18 Feb 2012 02:00:04 GMT
Author: markrmiller
Date: Sat Feb 18 02:00:03 2012
New Revision: 1245836

URL: http://svn.apache.org/viewvc?rev=1245836&view=rev
Log:
SOLR-3126: hardening around peer sync and tests

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Sat Feb
18 02:00:03 2012
@@ -49,7 +49,10 @@ public abstract class ElectionContext {
     this.leaderProps = leaderProps;
   }
   
-  abstract void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException;
+  // the given core may or may not be null - if you need access to the current core, you
must pass
+  // the core container and core name to your context impl - then use this core ref if it
is not null
+  // else access it from the core container
+  abstract void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore
core) throws KeeperException, InterruptedException, IOException;
 }
 
 class ShardLeaderElectionContextBase extends ElectionContext {
@@ -71,7 +74,7 @@ class ShardLeaderElectionContextBase ext
   }
 
   @Override
-  void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement)
+  void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore core)
       throws KeeperException, InterruptedException, IOException {
 
     try {
@@ -106,13 +109,19 @@ final class ShardLeaderElectionContext e
   }
   
   @Override
-  void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement)
+  void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore startupCore)
       throws KeeperException, InterruptedException, IOException {
     if (cc != null) {
       String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
       SolrCore core = null;
       try {
-        core = cc.getCore(coreName);
+        // the first time we are run, we will get a startupCore - after
+        // we will get null and must use cc.getCore
+        if (startupCore == null) {
+          core = cc.getCore(coreName);
+        } else {
+          core = startupCore;
+        }
         if (core == null) {
           throw new SolrException(ErrorCode.SERVER_ERROR, "Core not found:" + coreName);
         }
@@ -144,17 +153,17 @@ final class ShardLeaderElectionContext e
         // If I am going to be the leader I have to be active
         
         core.getUpdateHandler().getSolrCoreState().cancelRecovery();
-        zkController.publish(core, ZkStateReader.ACTIVE);
+        zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
         
       } finally {
-        if (core != null) {
+        if (core != null && startupCore == null) {
           core.close();
         }
       }
       
     }
     
-    super.runLeaderProcess(leaderSeqPath, weAreReplacement);
+    super.runLeaderProcess(leaderSeqPath, weAreReplacement, startupCore);
   }
 
   private void rejoinLeaderElection(String leaderSeqPath, SolrCore core)
@@ -162,12 +171,12 @@ final class ShardLeaderElectionContext e
     // remove our ephemeral and re join the election
    // System.out.println("sync failed, delete our election node:"
    //     + leaderSeqPath);
-    zkController.publish(core, ZkStateReader.DOWN);
+    zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
     zkClient.delete(leaderSeqPath, -1, true);
     
     core.getUpdateHandler().getSolrCoreState().doRecovery(core);
     
-    leaderElector.joinElection(this);
+    leaderElector.joinElection(this, null);
   }
   
   private boolean shouldIBeLeader(ZkNodeProps leaderProps) {
@@ -215,7 +224,7 @@ final class OverseerElectionContext exte
   }
 
   @Override
-  void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement) throws KeeperException,
InterruptedException {
+  void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore firstCore)
throws KeeperException, InterruptedException {
     
     final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1);
     ZkNodeProps myProps = new ZkNodeProps("id", id);

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Sat Feb 18
02:00:03 2012
@@ -30,6 +30,7 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
 import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.core.SolrCore;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -79,12 +80,13 @@ public  class LeaderElector {
    * @param seq
    * @param context 
    * @param replacement has someone else been the leader already?
+   * @param core 
    * @throws KeeperException
    * @throws InterruptedException
    * @throws IOException 
    * @throws UnsupportedEncodingException
    */
-  private void checkIfIamLeader(final String leaderSeqPath, final int seq, final ElectionContext
context, boolean replacement) throws KeeperException,
+  private void checkIfIamLeader(final String leaderSeqPath, final int seq, final ElectionContext
context, boolean replacement, SolrCore core) throws KeeperException,
       InterruptedException, IOException {
     // get all other numbers...
     final String holdElectionPath = context.electionPath + ELECTION_NODE;
@@ -93,7 +95,7 @@ public  class LeaderElector {
     sortSeqs(seqs);
     List<Integer> intSeqs = getSeqs(seqs);
     if (seq <= intSeqs.get(0)) {
-      runIamLeaderProcess(leaderSeqPath, context, replacement);
+      runIamLeaderProcess(leaderSeqPath, context, replacement, core);
     } else {
       // I am not the leader - watch the node below me
       int i = 1;
@@ -117,7 +119,7 @@ public  class LeaderElector {
               public void process(WatchedEvent event) {
                 // am I the next leader?
                 try {
-                  checkIfIamLeader(leaderSeqPath, seq, context, true);
+                  checkIfIamLeader(leaderSeqPath, seq, context, true, null);
                 } catch (InterruptedException e) {
                   // Restore the interrupted status
                   Thread.currentThread().interrupt();
@@ -135,16 +137,16 @@ public  class LeaderElector {
       } catch (KeeperException e) {
         // we couldn't set our watch - the node before us may already be down?
         // we need to check if we are the leader again
-        checkIfIamLeader(leaderSeqPath, seq, context, true);
+        checkIfIamLeader(leaderSeqPath, seq, context, true, null);
       }
     }
   }
 
   // TODO: get this core param out of here
-  protected void runIamLeaderProcess(String leaderSeqPath, final ElectionContext context,
boolean weAreReplacement) throws KeeperException,
+  protected void runIamLeaderProcess(String leaderSeqPath, final ElectionContext context,
boolean weAreReplacement, SolrCore core) throws KeeperException,
       InterruptedException, IOException {
 
-    context.runLeaderProcess(leaderSeqPath, weAreReplacement);
+    context.runLeaderProcess(leaderSeqPath, weAreReplacement, core);
   }
   
   /**
@@ -205,7 +207,7 @@ public  class LeaderElector {
    * @throws IOException 
    * @throws UnsupportedEncodingException
    */
-  public int joinElection(ElectionContext context) throws KeeperException, InterruptedException,
IOException {
+  public int joinElection(ElectionContext context, SolrCore core) throws KeeperException,
InterruptedException, IOException {
     final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;
     
     long sessionId = zkClient.getSolrZooKeeper().getSessionId();
@@ -247,7 +249,7 @@ public  class LeaderElector {
       }
     }
     int seq = getSeq(leaderSeqPath);
-    checkIfIamLeader(leaderSeqPath, seq, context, false);
+    checkIfIamLeader(leaderSeqPath, seq, context, false, core);
     
     return seq;
   }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Sat Feb
18 02:00:03 2012
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeoutExcep
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -173,10 +173,13 @@ public class RecoveryStrategy extends Th
     CommonsHttpSolrServer server = new CommonsHttpSolrServer(leaderBaseUrl);
     server.setConnectionTimeout(45000);
     server.setSoTimeout(45000);
-    PrepRecovery prepCmd = new PrepRecovery();
+    WaitForState prepCmd = new WaitForState();
     prepCmd.setCoreName(leaderCoreName);
     prepCmd.setNodeName(zkController.getNodeName());
     prepCmd.setCoreNodeName(coreZkNodeName);
+    prepCmd.setState(ZkStateReader.RECOVERING);
+    prepCmd.setCheckLive(true);
+    prepCmd.setPauseFor(4000);
     
     server.request(prepCmd);
     server.shutdown();
@@ -206,7 +209,7 @@ public class RecoveryStrategy extends Th
     while (!succesfulRecovery && !close && !isInterrupted()) { // don't use
interruption or it will close channels though
       try {
         // first thing we just try to sync
-        zkController.publish(core, ZkStateReader.RECOVERING);
+        zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
  
         CloudDescriptor cloudDesc = core.getCoreDescriptor()
             .getCloudDescriptor();

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sat Feb 18
02:00:03 2012
@@ -20,6 +20,7 @@ package org.apache.solr.cloud;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.MalformedURLException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -32,6 +33,8 @@ import java.util.concurrent.TimeoutExcep
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.CloudState;
@@ -177,22 +180,24 @@ public final class ZkController {
               //Overseer.createClientNodes(zkClient, getNodeName());
 
               ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient,
zkStateReader);
-              overseerElector.joinElection(context);
+              overseerElector.joinElection(context, null);
               zkStateReader.createClusterStateWatchersAndUpdate();
               
               List<CoreDescriptor> descriptors = registerOnReconnect
                   .getCurrentDescriptors();
               if (descriptors != null) {
                 // before registering as live, make sure everyone is in a
-                // recovery state
+                // down state
                 for (CoreDescriptor descriptor : descriptors) {
-                  final String shardZkNodeName = getNodeName() + "_"
+                  final String coreZkNodeName = getNodeName() + "_"
                       + descriptor.getName();
-                  publishAsDown(getBaseUrl(), descriptor, shardZkNodeName,
+                  publishAsDown(getBaseUrl(), descriptor, coreZkNodeName,
                       descriptor.getName());
+                  waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
                 }
               }
               
+
               // we have to register as live first to pick up docs in the buffer
               createEphemeralLiveNode();
               
@@ -218,6 +223,8 @@ public final class ZkController {
             }
 
           }
+
+ 
         });
     cmdExecutor = new ZkCmdExecutor();
     leaderElector = new LeaderElector(zkClient);
@@ -337,7 +344,7 @@ public final class ZkController {
       overseerElector = new LeaderElector(zkClient);
       ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
       overseerElector.setup(context);
-      overseerElector.joinElection(context);
+      overseerElector.joinElection(context, null);
       zkStateReader.createClusterStateWatchersAndUpdate();
       
     } catch (IOException e) {
@@ -478,30 +485,23 @@ public final class ZkController {
     String shardId = cloudDesc.getShardId();
 
     Map<String,String> props = new HashMap<String,String>();
+ // we only put a subset of props into the leader node
     props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
     props.put(ZkStateReader.CORE_NAME_PROP, coreName);
     props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-    props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
-    props.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
+
 
     if (log.isInfoEnabled()) {
         log.info("Register shard - core:" + coreName + " address:"
             + baseUrl + " shardId:" + shardId);
     }
 
-    // we only put a subset of props into the leader node
-    ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
-        props.get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP,
-        props.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.NODE_NAME_PROP,
-        props.get(ZkStateReader.NODE_NAME_PROP));
-    
-
-    joinElection(collection, coreZkNodeName, shardId, leaderProps);
+    ZkNodeProps leaderProps = new ZkNodeProps(props);
     
     // rather than look in the cluster state file, we go straight to the zknodes
     // here, because on cluster restart there could be stale leader info in the
     // cluster state node that won't be updated for a moment
-    String leaderUrl = getLeaderUrl(collection, cloudDesc.getShardId());
+    String leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()).getCoreUrl();
     
     // now wait until our currently cloud state contains the latest leader
     String cloudStateLeader = zkStateReader.getLeaderUrl(collection, cloudDesc.getShardId(),
30000);
@@ -573,7 +573,7 @@ public final class ZkController {
   }
   
   /**
-   * Get leader URL directly from zk nodes.
+   * Get leader props directly from zk nodes.
    * 
    * @param collection
    * @param slice
@@ -581,7 +581,7 @@ public final class ZkController {
    * @throws KeeperException
    * @throws InterruptedException
    */
-  private String getLeaderUrl(final String collection, final String slice)
+  private ZkCoreNodeProps getLeaderProps(final String collection, final String slice)
       throws KeeperException, InterruptedException {
     int iterCount = 60;
     while (iterCount-- > 0)
@@ -591,7 +591,7 @@ public final class ZkController {
             true);
         ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(
             ZkNodeProps.load(data));
-        return leaderProps.getCoreUrl();
+        return leaderProps;
       } catch (NoNodeException e) {
         Thread.sleep(500);
       }
@@ -600,12 +600,12 @@ public final class ZkController {
 
 
   private void joinElection(final String collection,
-      final String shardZkNodeName, String shardId, ZkNodeProps leaderProps) throws InterruptedException,
KeeperException, IOException {
+      final String shardZkNodeName, String shardId, ZkNodeProps leaderProps, SolrCore core)
throws InterruptedException, KeeperException, IOException {
     ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
         collection, shardZkNodeName, leaderProps, this, cc);
     
     leaderElector.setup(context);
-    leaderElector.joinElection(context);
+    leaderElector.joinElection(context, core);
   }
 
 
@@ -671,15 +671,14 @@ public final class ZkController {
     publishState(cd, shardZkNodeName, coreName, finalProps);
   }
 
-  public void publish(SolrCore core, String state) {
-    CoreDescriptor cd = core.getCoreDescriptor();
+  public void publish(CoreDescriptor cd, String state) {
     Map<String,String> finalProps = new HashMap<String,String>();
     finalProps.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
-    finalProps.put(ZkStateReader.CORE_NAME_PROP, core.getName());
+    finalProps.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
     finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
     finalProps.put(ZkStateReader.STATE_PROP, state);
-    publishState(cd, getNodeName() + "_" + core.getName(),
-        core.getName(), finalProps);
+    publishState(cd, getNodeName() + "_" + cd.getName(),
+        cd.getName(), finalProps);
   }
   
   void publishAsDown(String baseUrl,
@@ -959,4 +958,93 @@ public final class ZkController {
     uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
   }
 
+  public void preRegisterSetup(SolrCore core, CoreDescriptor cd) {
+    // before becoming available, make sure we are not live and active
+    // this also gets us our assigned shard id if it was not specified
+    publish(cd, ZkStateReader.DOWN);
+    
+    String shardId = cd.getCloudDescriptor().getShardId();
+    
+    Map<String,String> props = new HashMap<String,String>();
+    // we only put a subset of props into the leader node
+    props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
+    props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
+    props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    
+    final String coreZkNodeName = getNodeName() + "_" + cd.getName();
+    ZkNodeProps ourProps = new ZkNodeProps(props);
+    String collection = cd.getCloudDescriptor()
+        .getCollectionName();
+    
+    try {
+      joinElection(collection, coreZkNodeName, shardId, ourProps, core);
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+    } catch (KeeperException e) {
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+    } catch (IOException e) {
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+    }
+
+      
+      waitForLeaderToSeeDownState(cd, coreZkNodeName);
+    
+  }
+
+  private ZkCoreNodeProps waitForLeaderToSeeDownState(
+      CoreDescriptor descriptor, final String shardZkNodeName) {
+    CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
+    String collection = cloudDesc.getCollectionName();
+    String shard = cloudDesc.getShardId();
+    ZkCoreNodeProps leaderProps;
+    try {
+      // go straight to zk, not the cloud state - we must have current info
+      leaderProps = getLeaderProps(collection, shard);
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+    } catch (KeeperException e) {
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+    }
+    
+    String leaderBaseUrl = leaderProps.getBaseUrl();
+    String leaderCoreName = leaderProps.getCoreName();
+    
+    String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(),
+        descriptor.getName());
+    
+    boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
+    if (!isLeader && !SKIP_AUTO_RECOVERY) {
+      // wait until the leader sees us as down before we are willing to accept
+      // updates.
+      CommonsHttpSolrServer server = null;
+      try {
+        server = new CommonsHttpSolrServer(leaderBaseUrl);
+      } catch (MalformedURLException e) {
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      }
+      server.setConnectionTimeout(45000);
+      server.setSoTimeout(45000);
+      WaitForState prepCmd = new WaitForState();
+      prepCmd.setCoreName(leaderCoreName);
+      prepCmd.setNodeName(getNodeName());
+      prepCmd.setCoreNodeName(shardZkNodeName);
+      prepCmd.setState(ZkStateReader.DOWN);
+      prepCmd.setCheckLive(false);
+      
+      try {
+        server.request(prepCmd);
+      } catch (Exception e) {
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "Could not talk to the leader", e);
+      }
+      server.shutdown();
+    }
+    return leaderProps;
+  }
+
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java Sat Feb 18
02:00:03 2012
@@ -539,9 +539,8 @@ public class CoreContainer 
     }
 
     if (zkController != null) {
-      // before becoming available, make sure we are not live and active
-      // this also gets us our assigned shard id if it was not specified
-      zkController.publish(core, ZkStateReader.DOWN);
+      // this happens before we can receive requests
+      zkController.preRegisterSetup(core, core.getCoreDescriptor());
     }
     
     SolrCore old = null;
@@ -587,7 +586,7 @@ public class CoreContainer 
       } catch (Exception e) {
         // if register fails, this is really bad - close the zkController to
         // minimize any damage we can cause
-        zkController.publish(core, ZkStateReader.DOWN);
+        zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
         log.error("", e);
         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
             e);

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
(original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
Sat Feb 18 02:00:03 2012
@@ -178,7 +178,7 @@ public class CoreAdminHandler extends Re
         }
 
         case PREPRECOVERY: {
-          this.handlePrepRecoveryAction(req, rsp);
+          this.handleWaitForStateAction(req, rsp);
           break;
         }
         
@@ -614,7 +614,7 @@ public class CoreAdminHandler extends Re
     }
   }
   
-  protected void handlePrepRecoveryAction(SolrQueryRequest req,
+  protected void handleWaitForStateAction(SolrQueryRequest req,
       SolrQueryResponse rsp) throws IOException, InterruptedException {
     final SolrParams params = req.getParams();
     
@@ -625,8 +625,9 @@ public class CoreAdminHandler extends Re
     
     String nodeName = params.get("nodeName");
     String coreNodeName = params.get("coreNodeName");
-    
- 
+    String waitForState = params.get("state");
+    boolean checkLive = params.getBool("checkLive", true);
+    int pauseFor = params.getInt("pauseFor", 0);
     SolrCore core =  null;
 
     try {
@@ -653,16 +654,19 @@ public class CoreAdminHandler extends Re
           
           state = nodeProps.get(ZkStateReader.STATE_PROP);
           live = cloudState.liveNodesContain(nodeName);
-          if (nodeProps != null && state.equals(ZkStateReader.RECOVERING)
-              && live) {
-            break;
+          if (nodeProps != null && state.equals(waitForState)) {
+            if (checkLive && live) {
+              break;
+            } else {
+              break;
+            }
           }
         }
         
         if (retry++ == 30) {
           throw new SolrException(ErrorCode.BAD_REQUEST,
-              "I was asked to prep for recovery for " + nodeName
-                  + " but she is not live or not in a recovery state - state: " + state +
" live:" + live);
+              "I was asked to wait on state " + waitForState + " for " + nodeName
+                  + " but I still do not see the request state. I see state: " + state +
" live:" + live);
         }
         
         Thread.sleep(1000);
@@ -672,21 +676,7 @@ public class CoreAdminHandler extends Re
       // kept it from sending the update to be buffered -
       // pause for a while to let any outstanding updates finish
       
-      Thread.sleep(4000);
-      
-      UpdateRequestProcessorChain processorChain = core
-          .getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
-      
-      ModifiableSolrParams reqParams = new ModifiableSolrParams(req.getParams());
-      reqParams.set(DistributedUpdateProcessor.COMMIT_END_POINT, "true");
-      
-      SolrQueryRequest sqr = new LocalSolrQueryRequest(core, reqParams);
-      UpdateRequestProcessor processor = processorChain.createProcessor(sqr,
-          new SolrQueryResponse());
-      CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
-      
-      processor.processCommit(cuc);
-      processor.finish();
+      Thread.sleep(pauseFor);
       
       // solrcloud_debug
 //      try {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
(original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
Sat Feb 18 02:00:03 2012
@@ -170,7 +170,7 @@ public class DistributedUpdateProcessor 
           forwardToLeader = false;
           List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
               .getReplicaProps(collection, shardId, zkController.getNodeName(),
-                  coreName);
+                  coreName, null, ZkStateReader.DOWN);
           if (replicaProps != null) {
             nodes = new ArrayList<Node>(replicaProps.size());
             for (ZkCoreNodeProps props : replicaProps) {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
(original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
Sat Feb 18 02:00:03 2012
@@ -44,7 +44,6 @@ public abstract class AbstractDistribute
     super.setUp();
     log.info("####SETUP_START " + getName());
     createTempDir();
-    ignoreException("java.nio.channels.ClosedChannelException");
     
     String zkDir = testDir.getAbsolutePath() + File.separator
     + "zookeeper/server1/data";
@@ -84,13 +83,13 @@ public abstract class AbstractDistribute
   }
   
   protected void waitForRecoveriesToFinish(String collection, ZkStateReader zkStateReader,
boolean verbose)
-      throws KeeperException, InterruptedException {
-    waitForRecoveriesToFinish(collection, zkStateReader, verbose, false);
+      throws Exception {
+    waitForRecoveriesToFinish(collection, zkStateReader, verbose, true);
   }
   
   protected void waitForRecoveriesToFinish(String collection,
       ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout)
-      throws KeeperException, InterruptedException {
+      throws Exception {
     boolean cont = true;
     int cnt = 0;
     
@@ -117,12 +116,13 @@ public abstract class AbstractDistribute
           }
         }
       }
-      if (!sawLiveRecovering || cnt == 15) {
+      if (!sawLiveRecovering || cnt == 120) {
         if (!sawLiveRecovering) {
           if (verbose) System.out.println("no one is recoverying");
         } else {
           if (failOnTimeout) {
             fail("There are still nodes recoverying");
+            printLayout();
             return;
           }
           if (verbose) System.out
@@ -130,7 +130,7 @@ public abstract class AbstractDistribute
         }
         cont = false;
       } else {
-        Thread.sleep(2000);
+        Thread.sleep(1000);
       }
       cnt++;
     }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
(original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
Sat Feb 18 02:00:03 2012
@@ -48,7 +48,7 @@ public class ChaosMonkeySafeLeaderTest e
   public void setUp() throws Exception {
     super.setUp();
     // we expect this time of exception as shards go up and down...
-    ignoreException(".*");
+    //ignoreException(".*");
     
     // sometimes we cannot get the same port
     ignoreException("java\\.net\\.BindException: Address already in use");

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Sat Feb
18 02:00:03 2012
@@ -648,7 +648,7 @@ public class FullSolrCloudTest extends A
   }
   
   protected void waitForRecoveriesToFinish(boolean verbose)
-      throws KeeperException, InterruptedException {
+      throws Exception {
     super.waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, verbose);
   }
   

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java Sat
Feb 18 02:00:03 2012
@@ -110,7 +110,7 @@ public class LeaderElectionTest extends 
         
         try {
           elector.setup(context);
-          seq = elector.joinElection(context);
+          seq = elector.joinElection(context, null);
           electionDone = true;
           seqToThread.put(seq, this);
         } catch (InterruptedException e) {
@@ -153,7 +153,7 @@ public class LeaderElectionTest extends 
     ElectionContext context = new ShardLeaderElectionContextBase(elector,
         "shard2", "collection1", "dummynode1", props, zkStateReader);
     elector.setup(context);
-    elector.joinElection(context);
+    elector.joinElection(context, null);
     assertEquals("http://127.0.0.1/solr/",
         getLeaderUrl("collection1", "shard2"));
   }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Sat Feb 18
02:00:03 2012
@@ -41,6 +41,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -100,6 +101,13 @@ public class OverseerTest extends SolrTe
   
   @BeforeClass
   public static void beforeClass() throws Exception {
+    System.setProperty("solrcloud.skip.autorecovery", "true");
+    initCore();
+  }
+  
+  @AfterClass
+  public static void afterClass() throws Exception {
+    System.clearProperty("solrcloud.skip.autorecovery");
     initCore();
   }
 
@@ -143,9 +151,7 @@ public class OverseerTest extends SolrTe
         collection1Desc.setCollectionName("collection1");
         CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), "");
         desc1.setCloudDescriptor(collection1Desc);
-        zkController.publishAsDown(zkController.getBaseUrl(), desc1,
-            zkController.getNodeName() + "_" + "core" + (i + 1), "core"
-                + (i + 1));
+        zkController.preRegisterSetup(null, desc1);
         ids[i] = zkController.register("core" + (i + 1), desc1);
       }
       
@@ -242,10 +248,7 @@ public class OverseerTest extends SolrTe
             final CoreDescriptor desc = new CoreDescriptor(null, coreName, "");
             desc.setCloudDescriptor(collection1Desc);
             try {
-              controllers[slot % nodeCount].publishAsDown(controllers[slot
-                  % nodeCount].getBaseUrl(), desc, controllers[slot
-                  % nodeCount].getNodeName()
-                  + "_" + coreName, coreName);
+              controllers[slot % nodeCount].preRegisterSetup(null, desc);
               ids[slot] = controllers[slot % nodeCount]
                   .register(coreName, desc);
             } catch (Throwable e) {
@@ -673,7 +676,7 @@ public class OverseerTest extends SolrTe
     LeaderElector overseerElector = new LeaderElector(zkClient);
     ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), zkClient,
reader);
     overseerElector.setup(ec);
-    overseerElector.joinElection(ec);
+    overseerElector.joinElection(ec, null);
     return zkClient;
   }
 }
\ No newline at end of file

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
(original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
Sat Feb 18 02:00:03 2012
@@ -115,11 +115,14 @@ public class CoreAdminRequest extends So
 
   }
   
-  public static class PrepRecovery extends CoreAdminRequest {
+  public static class WaitForState extends CoreAdminRequest {
     protected String nodeName;
     protected String coreNodeName;
-
-    public PrepRecovery() {
+    protected String state;
+    protected Boolean checkLive;
+    protected Integer pauseFor;
+    
+    public WaitForState() {
       action = CoreAdminAction.PREPRECOVERY;
     }
     
@@ -139,6 +142,30 @@ public class CoreAdminRequest extends So
       this.coreNodeName = coreNodeName;
     }
     
+    public String getState() {
+      return state;
+    }
+
+    public void setState(String state) {
+      this.state = state;
+    }
+
+    public Boolean getCheckLive() {
+      return checkLive;
+    }
+
+    public void setCheckLive(Boolean checkLive) {
+      this.checkLive = checkLive;
+    }
+    
+    public Integer getPauseFor() {
+      return pauseFor;
+    }
+
+    public void setPauseFor(Integer pauseFor) {
+      this.pauseFor = pauseFor;
+    }
+    
     @Override
     public SolrParams getParams() {
       if( action == null ) {
@@ -156,6 +183,18 @@ public class CoreAdminRequest extends So
       if (coreNodeName != null) {
         params.set( "coreNodeName", coreNodeName);
       }
+      
+      if (state != null) {
+        params.set( "state", state);
+      }
+      
+      if (checkLive != null) {
+        params.set( "checkLive", checkLive);
+      }
+      
+      if (pauseFor != null) {
+        params.set( "pauseFor", pauseFor);
+      }
 
       return params;
     }

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1245836&r1=1245835&r2=1245836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Sat
Feb 18 02:00:03 2012
@@ -420,7 +420,12 @@ public class ZkStateReader {
   }
   
   public List<ZkCoreNodeProps> getReplicaProps(String collection,
-      String shardId, String thisNodeName, String coreName, String stateFilter) {
+      String shardId, String thisNodeName, String coreName, String mustMatchStateFilter)
{
+    return getReplicaProps(collection, shardId, thisNodeName, coreName, mustMatchStateFilter,
null);
+  }
+  
+  public List<ZkCoreNodeProps> getReplicaProps(String collection,
+      String shardId, String thisNodeName, String coreName, String mustMatchStateFilter,
String mustNotMatchStateFilter) {
     CloudState cloudState = this.cloudState;
     if (cloudState == null) {
       return null;
@@ -444,8 +449,10 @@ public class ZkStateReader {
       ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
       String coreNodeName = nodeProps.getNodeName() + "_" + nodeProps.getCoreName();
       if (cloudState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(filterNodeName))
{
-        if (stateFilter == null || stateFilter.equals(nodeProps.getState())) {
-          nodes.add(nodeProps);
+        if (mustMatchStateFilter == null || mustMatchStateFilter.equals(nodeProps.getState()))
{
+          if (mustNotMatchStateFilter == null || !mustNotMatchStateFilter.equals(nodeProps.getState()))
{
+            nodes.add(nodeProps);
+          }
         }
       }
     }



Mime
View raw message