lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From thelabd...@apache.org
Subject svn commit: r1593312 [1/2] - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/client/solrj/embedded/ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/admin/ core/src/java/org/apache/solr/update/ core/src/java/org/ap...
Date Thu, 08 May 2014 15:48:26 GMT
Author: thelabdude
Date: Thu May  8 15:48:26 2014
New Revision: 1593312

URL: http://svn.apache.org/r1593312
Log:
SOLR-5495: Hardening recovery scenarios after the leader receives an error trying to forward an update request to a replica.

Added:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SocketProxy.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1593312&r1=1593311&r2=1593312&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu May  8 15:48:26 2014
@@ -82,6 +82,10 @@ Other Changes
 * SOLR-5868: HttpClient should be configured to use ALLOW_ALL_HOSTNAME hostname
   verifier to simplify SSL setup. (Steve Davids via Mark Miller)
 
+* SOLR-5495: Recovery strategy for leader partitioned from replica case. Hardening
+  recovery scenarios after the leader receives an error trying to forward an
+  update request to a replica. (Timothy Potter)
+
 ==================  4.9.0 ==================
 
 Versions of Major Components

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java?rev=1593312&r1=1593311&r2=1593312&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java Thu May  8 15:48:26 2014
@@ -95,6 +95,8 @@ public class JettySolrRunner {
   private LinkedList<FilterHolder> extraFilters;
 
   private SSLConfig sslConfig;
+  
+  private int proxyPort = -1;
 
   public static class DebugFilter implements Filter {
     public int requestsToKeep = 10;
@@ -477,7 +479,7 @@ public class JettySolrRunner {
     if (0 == conns.length) {
       throw new RuntimeException("Jetty Server has no Connectors");
     }
-    return conns[0].getLocalPort();
+    return (proxyPort != -1) ? proxyPort : conns[0].getLocalPort();
   }
   
   /**
@@ -489,7 +491,16 @@ public class JettySolrRunner {
     if (lastPort == -1) {
       throw new IllegalStateException("You cannot get the port until this instance has started");
     }
-    return lastPort;
+    return (proxyPort != -1) ? proxyPort : lastPort;
+  }
+  
+  /**
+   * Sets the port of a local socket proxy that sits infront of this server; if set
+   * then all client traffic will flow through the proxy, giving us the ability to
+   * simulate network partitions very easily.
+   */
+  public void setProxyPort(int proxyPort) {
+    this.proxyPort = proxyPort;
   }
 
   /**

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1593312&r1=1593311&r2=1593312&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Thu May  8 15:48:26 2014
@@ -3,8 +3,6 @@ package org.apache.solr.cloud;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -26,7 +24,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /*
@@ -144,9 +143,8 @@ class ShardLeaderElectionContextBase ext
         ZkStateReader.CORE_NAME_PROP,
         leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
         ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
-    Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
-  }
-  
+    Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));    
+  }  
 }
 
 // add core container and stop passing core around...
@@ -287,9 +285,11 @@ final class ShardLeaderElectionContext e
       core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
     }
 
+    boolean isLeader = true;
     try {
       super.runLeaderProcess(weAreReplacement, 0);
     } catch (Exception e) {
+      isLeader = false;
       SolrException.log(log, "There was a problem trying to register as the leader", e);
   
       try (SolrCore core = cc.getCore(coreName)) {
@@ -305,7 +305,72 @@ final class ShardLeaderElectionContext e
         rejoinLeaderElection(leaderSeqPath, core);
       }
     }
-    
+
+    if (isLeader) {
+      // check for any replicas in my shard that were set to down by the previous leader
+      try {
+        startLeaderInitiatedRecoveryOnReplicas(coreName);
+      } catch (Exception exc) {
+        // don't want leader election to fail because of
+        // an error trying to tell others to recover
+      }
+    }    
+  }
+  
+  private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
+    try (SolrCore core = cc.getCore(coreName)) {
+      CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+      String coll = cloudDesc.getCollectionName();
+      String shardId = cloudDesc.getShardId(); 
+      String znodePath = zkController.getLeaderInitiatedRecoveryZnodePath(coll, shardId);
+      List<String> replicas = null;
+      try {
+        replicas = zkClient.getChildren(znodePath, null, false);
+      } catch (NoNodeException nne) {
+        // this can be ignored
+      }
+      
+      if (replicas != null && replicas.size() > 0) {
+        for (String replicaCore : replicas) {
+          
+          if (coreName.equals(replicaCore))
+            continue; // added safe-guard so we don't mark this core as down
+          
+          String lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCore);
+          if (ZkStateReader.DOWN.equals(lirState) || ZkStateReader.RECOVERY_FAILED.equals(lirState)) {
+            log.info("After "+coreName+" was elected leader, found "+
+               replicaCore+" as "+lirState+" and needing recovery.");
+            
+            List<ZkCoreNodeProps> replicaProps = 
+                zkController.getZkStateReader().getReplicaProps(
+                    collection, shardId, coreName, replicaCore, null, null);
+            
+            if (replicaProps != null && replicaProps.size() > 0) {                
+              ZkCoreNodeProps coreNodeProps = null;
+              for (ZkCoreNodeProps p : replicaProps) {
+                if (p.getCoreName().equals(replicaCore)) {
+                  coreNodeProps = p;
+                  break;
+                }
+              }
+              
+              LeaderInitiatedRecoveryThread lirThread = 
+                  new LeaderInitiatedRecoveryThread(zkController,
+                                                    cc,
+                                                    collection,
+                                                    shardId,
+                                                    coreNodeProps,
+                                                    120);
+              zkController.ensureReplicaInLeaderInitiatedRecovery(
+                  collection, shardId, replicaCore, coreNodeProps, false);
+              
+              ExecutorService executor = cc.getUpdateShardHandler().getUpdateExecutor();
+              executor.execute(lirThread);
+            }              
+          }
+        }
+      }
+    } // core gets closed automagically    
   }
 
   private void waitForReplicasToComeUp(boolean weAreReplacement, int timeoutms) throws InterruptedException {
@@ -373,7 +438,7 @@ final class ShardLeaderElectionContext e
   }
 
   private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) {
-    log.info("Checking if I should try and be the leader.");
+    log.info("Checking if I ("+core.getName()+") should try and be the leader.");
     
     if (isClosed) {
       log.info("Bailing on leader process because we have been closed");
@@ -386,8 +451,18 @@ final class ShardLeaderElectionContext e
       return true;
     }
     
-    if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished()
-        .equals(ZkStateReader.ACTIVE)) {
+    if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished().equals(ZkStateReader.ACTIVE)) {
+      
+      // maybe active but if the previous leader marked us as down and
+      // we haven't recovered, then can't be leader
+      String lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId, core.getName());
+      if (ZkStateReader.DOWN.equals(lirState) || ZkStateReader.RECOVERING.equals(lirState)) {
+        log.warn("Although my last published state is Active, the previous leader marked me "+core.getName()
+            + " as " + lirState
+            + " and I haven't recovered yet, so I shouldn't be the leader.");
+        return false;
+      }
+      
       log.info("My last published State was Active, it's okay to be the leader.");
       return true;
     }

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java?rev=1593312&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java Thu May  8 15:48:26 2014
@@ -0,0 +1,234 @@
+package org.apache.solr.cloud;
+
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.util.List;
+
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Background daemon thread that tries to send the REQUESTRECOVERY to a downed
+ * replica; used by a shard leader to nag a replica into recovering after the
+ * leader experiences an error trying to send an update request to the replica.
+ */
+public class LeaderInitiatedRecoveryThread extends Thread {
+
+  public final static Logger log = LoggerFactory.getLogger(LeaderInitiatedRecoveryThread.class);
+
+  protected ZkController zkController;
+  protected CoreContainer coreContainer;
+  protected String collection;
+  protected String shardId;
+  protected ZkCoreNodeProps nodeProps;
+  protected int maxTries;
+  
+  public LeaderInitiatedRecoveryThread(ZkController zkController, 
+                                       CoreContainer cc, 
+                                       String collection, 
+                                       String shardId, 
+                                       ZkCoreNodeProps nodeProps,
+                                       int maxTries)
+  {
+    super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName());
+    this.zkController = zkController;
+    this.coreContainer = cc;
+    this.collection = collection;
+    this.shardId = shardId;    
+    this.nodeProps = nodeProps;
+    this.maxTries = maxTries;
+    
+    setDaemon(true);
+  }
+  
+  public void run() {
+    long startMs = System.currentTimeMillis();
+    try {
+      sendRecoveryCommandWithRetry();
+    } catch (Exception exc) {
+      log.error(getName()+" failed due to: "+exc, exc);
+      if (exc instanceof SolrException) {
+        throw (SolrException)exc;
+      } else {
+        throw new SolrException(ErrorCode.SERVER_ERROR, exc);
+      }
+    }
+    long diffMs = (System.currentTimeMillis() - startMs);
+    log.info(getName()+" completed successfully after running for "+Math.round(diffMs/1000L)+" secs");    
+  }
+  
+  protected void sendRecoveryCommandWithRetry() throws Exception {    
+    int tries = 0;
+    long waitBetweenTriesMs = 5000L;
+    boolean continueTrying = true;
+        
+    String recoveryUrl = nodeProps.getBaseUrl();
+    String replicaNodeName = nodeProps.getNodeName();
+    String coreNeedingRecovery = nodeProps.getCoreName();
+    String replicaUrl = nodeProps.getCoreUrl();
+    
+    log.info(getName()+" started running to send REQUESTRECOVERY command to "+replicaUrl+
+        "; will try for a max of "+(maxTries * (waitBetweenTriesMs/1000))+" secs");
+
+    RequestRecovery recoverRequestCmd = new RequestRecovery();
+    recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
+    recoverRequestCmd.setCoreName(coreNeedingRecovery);
+    
+    while (continueTrying && ++tries < maxTries) {
+      if (tries > 1) {
+        log.warn("Asking core "+coreNeedingRecovery+" on  " + recoveryUrl + 
+            " to recover; unsuccessful after "+tries+" of "+maxTries+" attempts so far ...");              
+      } else {
+        log.info("Asking core "+coreNeedingRecovery+" on  " + recoveryUrl + " to recover");              
+      }
+      
+      HttpSolrServer server = new HttpSolrServer(recoveryUrl);
+      try {
+        server.setSoTimeout(60000);
+        server.setConnectionTimeout(15000);
+        try {
+          server.request(recoverRequestCmd);
+          
+          log.info("Successfully sent "+CoreAdminAction.REQUESTRECOVERY+
+              " command to core "+coreNeedingRecovery+" on "+recoveryUrl);
+          
+          continueTrying = false; // succeeded, so stop looping
+        } catch (Throwable t) {
+          Throwable rootCause = SolrException.getRootCause(t);
+          boolean wasCommError =
+              (rootCause instanceof ConnectException ||
+                  rootCause instanceof ConnectTimeoutException ||
+                  rootCause instanceof NoHttpResponseException ||
+                  rootCause instanceof SocketException);
+
+          SolrException.log(log, recoveryUrl + ": Could not tell a replica to recover", t);
+          
+          if (!wasCommError) {
+            continueTrying = false;
+          }                                                
+        }
+      } finally {
+        server.shutdown();
+      }
+      
+      // wait a few seconds
+      if (continueTrying) {
+        try {
+          Thread.sleep(waitBetweenTriesMs);
+        } catch (InterruptedException ignoreMe) {
+          Thread.currentThread().interrupt();          
+        }
+        
+        if (coreContainer.isShutDown()) {
+          log.warn("Stop trying to send recovery command to downed replica "+coreNeedingRecovery+
+              " on "+replicaNodeName+" because my core container is shutdown.");
+          continueTrying = false;
+          break;
+        }
+        
+        // see if the replica's node is still live, if not, no need to keep doing this loop
+        ZkStateReader zkStateReader = zkController.getZkStateReader();
+        try {
+          zkStateReader.updateClusterState(true);
+        } catch (Exception exc) {
+          log.warn("Error when updating cluster state: "+exc);
+        }        
+        
+        if (!zkStateReader.getClusterState().liveNodesContain(replicaNodeName)) {
+          log.warn("Node "+replicaNodeName+" hosting core "+coreNeedingRecovery+
+              " is no longer live. No need to keep trying to tell it to recover!");
+          continueTrying = false;
+          break;
+        }
+
+        // additional safeguard against the replica trying to be in the active state
+        // before acknowledging the leader initiated recovery command
+        if (continueTrying && collection != null && shardId != null) {
+          try {
+            // call out to ZooKeeper to get the leader-initiated recovery state
+            String lirState = 
+                zkController.getLeaderInitiatedRecoveryState(collection, shardId, coreNeedingRecovery);
+            
+            if (lirState == null) {
+              log.warn("Stop trying to send recovery command to downed replica "+coreNeedingRecovery+
+                  " on "+replicaNodeName+" because the znode no longer exists.");
+              continueTrying = false;
+              break;              
+            }
+            
+            if (ZkStateReader.RECOVERING.equals(lirState)) {
+              // replica has ack'd leader initiated recovery and entered the recovering state
+              // so we don't need to keep looping to send the command
+              continueTrying = false;  
+              log.info("Replica "+coreNeedingRecovery+
+                  " on node "+replicaNodeName+" ack'd the leader initiated recovery state, "
+                      + "no need to keep trying to send recovery command");
+            } else {
+              String leaderCoreNodeName = zkStateReader.getLeaderRetry(collection, shardId, 5000).getName();
+              List<ZkCoreNodeProps> replicaProps = 
+                  zkStateReader.getReplicaProps(collection, shardId, leaderCoreNodeName, coreNeedingRecovery, null, null);
+              if (replicaProps != null && replicaProps.size() > 0) {
+                String replicaState = replicaProps.get(0).getState();
+                if (ZkStateReader.ACTIVE.equals(replicaState)) {
+                  // replica published its state as "active", 
+                  // which is bad if lirState is still "down"
+                  if (ZkStateReader.DOWN.equals(lirState)) {
+                    // OK, so the replica thinks it is active, but it never ack'd the leader initiated recovery
+                    // so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
+                    log.warn("Replica "+coreNeedingRecovery+" set to active but the leader thinks it should be in recovery;"
+                        + " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0));
+                    zkController.ensureReplicaInLeaderInitiatedRecovery(collection, 
+                        shardId, replicaUrl, nodeProps, true); // force republish state to "down"
+                  }
+                }                    
+              }                    
+            }                  
+          } catch (Exception ignoreMe) {
+            log.warn("Failed to determine state of "+coreNeedingRecovery+" due to: "+ignoreMe);
+            // eventually this loop will exhaust max tries and stop so we can just log this for now
+          }                
+        }
+      }
+    }
+    
+    // replica is no longer in recovery on this node (may be handled on another node)
+    zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
+    
+    if (continueTrying) {
+      // ugh! this means the loop timed out before the recovery command could be delivered
+      // how exotic do we want to get here?
+      log.error("Timed out after waiting for "+(tries * (waitBetweenTriesMs/1000))+
+          " secs to send the recovery request to: "+replicaUrl+"; not much more we can do here?");
+      
+      // TODO: need to raise a JMX event to allow monitoring tools to take over from here
+      
+    }    
+  }
+}

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1593312&r1=1593311&r2=1593312&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Thu May  8 15:48:26 2014
@@ -351,6 +351,7 @@ public class RecoveryStrategy extends Th
           return;
         }
         
+        log.info("Publishing state of core "+core.getName()+" as recovering, leader is "+leaderUrl+" and I am "+ourUrl);
         zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
         
         
@@ -608,6 +609,9 @@ public class RecoveryStrategy extends Th
       }
       HttpUriRequestResponse mrr = server.httpUriRequest(prepCmd);
       prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
+      
+      log.info("Sending prep recovery command to {}; {}", leaderBaseUrl, prepCmd.toString());
+      
       mrr.future.get();
     } finally {
       server.shutdown();

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1593312&r1=1593311&r2=1593312&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Thu May  8 15:48:26 2014
@@ -17,6 +17,28 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.URLEncoder;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
@@ -46,34 +68,13 @@ import org.apache.solr.handler.component
 import org.apache.solr.update.UpdateLog;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.URLEncoder;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 /**
  * Handle ZooKeeper interactions.
  * 
@@ -174,6 +175,9 @@ public final class ZkController {
 
   private volatile boolean isClosed;
   
+  // keeps track of replicas that have been asked to recover by leaders running on this node
+  private Map<String,String> replicasInLeaderInitiatedRecovery = new HashMap<String,String>();  
+  
   public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
         String localHostContext, int leaderVoteWait, int leaderConflictResolveWait, boolean genericCoreNodeNames, final CurrentCoreDescriptorProvider registerOnReconnect) 
       throws InterruptedException, TimeoutException, IOException
@@ -807,6 +811,7 @@ public final class ZkController {
           Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
               .getUpdateLog().recoverFromLog();
           if (recoveryFuture != null) {
+            log.info("Replaying tlog for "+ourUrl+" during startup... NOTE: This can take a while.");
             recoveryFuture.get(); // NOTE: this could potentially block for
             // minutes or more!
             // TODO: public as recovering in the mean time?
@@ -981,6 +986,14 @@ public final class ZkController {
         core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
         return true;
       }
+      
+      // see if the leader told us to recover
+      String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreName);
+      if (ZkStateReader.DOWN.equals(lirState)) {
+        log.info("Leader marked core "+core.getName()+" down; starting recovery process");
+        core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
+        return true;        
+      }
     } else {
       log.info("I am the leader, no recovery necessary");
     }
@@ -1023,7 +1036,30 @@ public final class ZkController {
     
     assert collection != null && collection.length() > 0;
     
-    String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+    String shardId = cd.getCloudDescriptor().getShardId();
+    String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();    
+    // If the leader initiated recovery, then verify that this replica has performed
+    // recovery as requested before becoming active; don't even look at lirState if going down
+    if (!ZkStateReader.DOWN.equals(state)) {
+      String lirState = getLeaderInitiatedRecoveryState(collection, shardId, cd.getName());
+      if (lirState != null) {
+        if ("active".equals(state)) {
+          // trying to become active, so leader-initiated state must be recovering
+          if (ZkStateReader.RECOVERING.equals(lirState)) {
+            updateLeaderInitiatedRecoveryState(collection, shardId, cd.getName(), ZkStateReader.ACTIVE);
+          } else if (ZkStateReader.DOWN.equals(lirState)) {
+            throw new SolrException(ErrorCode.INVALID_STATE, 
+                "Cannot publish state of core '"+cd.getName()+"' as active without recovering first!");
+          }
+        } else if (ZkStateReader.RECOVERING.equals(state)) {
+          // if it is currently DOWN, then trying to enter into recovering state is good
+          if (ZkStateReader.DOWN.equals(lirState)) {
+            updateLeaderInitiatedRecoveryState(collection, shardId, cd.getName(), ZkStateReader.RECOVERING);
+          }
+        }
+      }
+    }
+    
     //assert cd.getCloudDescriptor().getShardId() != null;
     ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state", 
         ZkStateReader.STATE_PROP, state, 
@@ -1040,7 +1076,7 @@ public final class ZkController {
     }
     overseerJobQueue.offer(ZkStateReader.toJSON(m));
   }
-
+  
   private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
       final ClusterState state, final String coreNodeName) {
 
@@ -1450,49 +1486,69 @@ public final class ZkController {
     String leaderBaseUrl = leaderProps.getBaseUrl();
     String leaderCoreName = leaderProps.getCoreName();
     
-    String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(),
-        descriptor.getName());
+    String myCoreName = descriptor.getName();
+    String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), myCoreName);
     
     boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
     if (!isLeader && !SKIP_AUTO_RECOVERY) {
-      HttpSolrServer server = null;
-      server = new HttpSolrServer(leaderBaseUrl);
+      
+      // detect if this core is in leader-initiated recovery and if so, 
+      // then we don't need the leader to wait on seeing the down state
+      String lirState = null;
       try {
-        server.setConnectionTimeout(15000);
-        server.setSoTimeout(120000);
-        WaitForState prepCmd = new WaitForState();
-        prepCmd.setCoreName(leaderCoreName);
-        prepCmd.setNodeName(getNodeName());
-        prepCmd.setCoreNodeName(coreZkNodeName);
-        prepCmd.setState(ZkStateReader.DOWN);
+        lirState = getLeaderInitiatedRecoveryState(collection, shard, myCoreName);
+      } catch (Exception exc) {
+        log.error("Failed to determine if replica "+myCoreName+
+            " is in leader-initiated recovery due to: "+exc, exc);
+      }
+      
+      if (lirState != null) {
+        log.info("Replica "+myCoreName+
+            " is already in leader-initiated recovery, so not waiting for leader to see down state.");
+      } else {
         
-        // let's retry a couple times - perhaps the leader just went down,
-        // or perhaps he is just not quite ready for us yet
-        retries = 6;
-        for (int i = 0; i < retries; i++) {
-          if (isClosed) {
-            throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
-                "We have been closed");
-          }
-          try {
-            server.request(prepCmd);
-            break;
-          } catch (Exception e) {
-            SolrException.log(log,
-                "There was a problem making a request to the leader", e);
-            try {
-              Thread.sleep(2000);
-            } catch (InterruptedException e1) {
-              Thread.currentThread().interrupt();
+        log.info("Replica "+myCoreName+
+            " NOT in leader-initiated recovery, need to wait for leader to see down state.");
+            
+        HttpSolrServer server = null;
+        server = new HttpSolrServer(leaderBaseUrl);
+        try {
+          server.setConnectionTimeout(15000);
+          server.setSoTimeout(120000);
+          WaitForState prepCmd = new WaitForState();
+          prepCmd.setCoreName(leaderCoreName);
+          prepCmd.setNodeName(getNodeName());
+          prepCmd.setCoreNodeName(coreZkNodeName);
+          prepCmd.setState(ZkStateReader.DOWN);
+          
+          // let's retry a couple times - perhaps the leader just went down,
+          // or perhaps he is just not quite ready for us yet
+          retries = 6;
+          for (int i = 0; i < retries; i++) {
+            if (isClosed) {
+              throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+                  "We have been closed");
             }
-            if (i == retries - 1) {
-              throw new SolrException(ErrorCode.SERVER_ERROR,
-                  "There was a problem making a request to the leader");
+            try {
+              server.request(prepCmd);
+              break;
+            } catch (Exception e) {
+              SolrException.log(log,
+                  "There was a problem making a request to the leader", e);
+              try {
+                Thread.sleep(2000);
+              } catch (InterruptedException e1) {
+                Thread.currentThread().interrupt();
+              }
+              if (i == retries - 1) {
+                throw new SolrException(ErrorCode.SERVER_ERROR,
+                    "There was a problem making a request to the leader");
+              }
             }
           }
+        } finally {
+          server.shutdown();
         }
-      } finally {
-        server.shutdown();
       }
     }
     return leaderProps;
@@ -1669,5 +1725,161 @@ public final class ZkController {
   CoreContainer getCoreContainer(){
     return cc;
   }
+      
+  /**
+   * When a leader receives a communication error when trying to send a request to a replica,
+   * it calls this method to ensure the replica enters recovery when connectivity is restored.
+   * 
+   * returns true if the node hosting the replica is still considered "live" by ZooKeeper;
+   * false means the node is not live either, so no point in trying to send recovery commands
+   * to it.
+   */
+  public boolean ensureReplicaInLeaderInitiatedRecovery(final String collection, 
+      final String shardId, final String replicaUrl, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState) 
+          throws KeeperException, InterruptedException 
+  {
+    
+    // First, determine if this replica is already in recovery handling
+    // which is needed because there can be many concurrent errors flooding in
+    // about the same replica having trouble and we only need to send the "needs"
+    // recovery signal once
+    boolean nodeIsLive = true;
+    synchronized (replicasInLeaderInitiatedRecovery) {
+      if (replicasInLeaderInitiatedRecovery.containsKey(replicaUrl)) {     
+        if (!forcePublishState) {
+          return false; // already in this recovery process
+        }
+      }
+      
+      // if the replica's state is not DOWN right now, make it so ...        
+      String replicaNodeName = replicaCoreProps.getNodeName();      
+      String replicaCoreName = replicaCoreProps.getCoreName();      
+      assert replicaCoreName != null : "No core name for replica "+replicaNodeName;
+      
+      // we only really need to try to send the recovery command if the node itself is "live"
+      if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
+        replicasInLeaderInitiatedRecovery.put(replicaUrl, 
+            getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreName));          
+        // create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
+        updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreName, ZkStateReader.DOWN);
+      } else {
+        nodeIsLive = false; // we really don't need to send the recovery request if the node is NOT live
+      }      
+    }    
+    
+    String replicaCoreName = replicaCoreProps.getCoreName();    
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state", 
+        ZkStateReader.STATE_PROP, ZkStateReader.DOWN, 
+        ZkStateReader.BASE_URL_PROP, replicaCoreProps.getBaseUrl(), 
+        ZkStateReader.CORE_NAME_PROP, replicaCoreProps.getCoreName(),
+        ZkStateReader.NODE_NAME_PROP, replicaCoreProps.getNodeName(),
+        ZkStateReader.SHARD_ID_PROP, shardId,
+        ZkStateReader.COLLECTION_PROP, collection);
+    log.warn("Leader is publishing core={} state={} on behalf of un-reachable replica {}; forcePublishState? "+forcePublishState,
+        replicaCoreName, ZkStateReader.DOWN, replicaUrl);
+    overseerJobQueue.offer(ZkStateReader.toJSON(m));
+    
+    return nodeIsLive;
+  }  
+  
+  public boolean isReplicaInRecoveryHandling(String replicaUrl) {
+    boolean exists = false;
+    synchronized (replicasInLeaderInitiatedRecovery) {
+      exists = replicasInLeaderInitiatedRecovery.containsKey(replicaUrl);
+    }
+    return exists;
+  }
+  
+  public void removeReplicaFromLeaderInitiatedRecoveryHandling(String replicaUrl) {
+    synchronized(replicasInLeaderInitiatedRecovery) {
+      replicasInLeaderInitiatedRecovery.remove(replicaUrl);           
+    }
+  }  
+  
+  public String getLeaderInitiatedRecoveryState(String collection, String shardId, String coreName) {
+    
+    if (collection == null || shardId == null || coreName == null)
+      return null; // if we don't have complete data about a core in cloud mode, return null
+    
+    String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreName);    
+    String state = null;
+    try {
+      byte[] data = zkClient.getData(znodePath, null, new Stat(), false);
+      if (data != null && data.length > 0)
+        state = new String(data, "UTF-8");
+    } catch (NoNodeException ignoreMe) {
+      // safe to ignore as this znode will only exist if the leader initiated recovery
+    } catch (ConnectionLossException cle) {
+      // sort of safe to ignore ??? Usually these are seen when the core is going down
+      // or there are bigger issues to deal with than reading this znode
+      log.warn("Unable to read "+znodePath+" due to: "+cle);
+    } catch (SessionExpiredException see) {
+      // sort of safe to ignore ??? Usually these are seen when the core is going down
+      // or there are bigger issues to deal with than reading this znode
+      log.warn("Unable to read "+znodePath+" due to: "+see);
+    } catch (UnsupportedEncodingException e) {
+      throw new Error("JVM Does not seem to support UTF-8", e);
+    } catch (Exception exc) {
+      log.error("Failed to read data from znode "+znodePath+" due to: "+exc);
+      if (exc instanceof SolrException) {
+        throw (SolrException)exc;
+      } else {
+        throw new SolrException(ErrorCode.SERVER_ERROR, 
+            "Failed to read data from znodePath: "+znodePath, exc);
+      }
+    }
+    return state;
+  }
+  
+  private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreName, String state) {
+    if (collection == null || shardId == null || coreName == null) {
+      log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+
+          "; shardId="+shardId+"; coreName="+coreName);
+      return; // if we don't have complete data about a core in cloud mode, do nothing
+    }
 
+    String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreName);
+    
+    if (ZkStateReader.ACTIVE.equals(state)) {
+      // since we're marking it active, we don't need this znode anymore, so delete instead of update
+      try {
+        zkClient.delete(znodePath, -1, false);
+      } catch (Exception justLogIt) {
+        log.warn("Failed to delete znode "+znodePath+" due to: "+justLogIt);
+      }
+      return;
+    }
+    
+    byte[] znodeData = null;
+    try {
+      znodeData = state.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new Error("JVM Does not seem to support UTF-8", e);
+    }
+
+    boolean retryOnConnLoss = true; // be a little more robust when trying to write data
+    try {
+      if (zkClient.exists(znodePath, retryOnConnLoss)) {
+        zkClient.setData(znodePath, znodeData, retryOnConnLoss);
+      } else {
+        zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
+      }
+      log.info("Wrote "+state+" to "+znodePath);
+    } catch (Exception exc) {
+      if (exc instanceof SolrException) {
+        throw (SolrException)exc;
+      } else {
+        throw new SolrException(ErrorCode.SERVER_ERROR, 
+            "Failed to update data to "+state+" for znode: "+znodePath, exc);        
+      }
+    }
+  }
+  
+  public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
+    return "/collections/"+collection+"/leader_initiated_recovery/"+shardId;
+  }  
+  
+  public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreName) {
+    return getLeaderInitiatedRecoveryZnodePath(collection, shardId)+"/"+coreName;
+  }
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1593312&r1=1593311&r2=1593312&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Thu May  8 15:48:26 2014
@@ -845,7 +845,7 @@ public class CoreAdminHandler extends Re
   protected void handleRequestRecoveryAction(SolrQueryRequest req,
       SolrQueryResponse rsp) throws IOException {
     final SolrParams params = req.getParams();
-    log.info("It has been requested that we recover");
+    log.info("It has been requested that we recover: core="+params.get(CoreAdminParams.CORE));
     Thread thread = new Thread() {
       @Override
       public void run() {
@@ -958,7 +958,8 @@ public class CoreAdminHandler extends Re
     Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive");
 
     log.info("Going to wait for coreNodeName: " + coreNodeName + ", state: " + waitForState
-        + ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader);
+        + ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader
+        + ", onlyIfLeaderActive: "+onlyIfLeaderActive);
 
     int maxTries = 0; 
     String state = null;
@@ -1014,10 +1015,28 @@ public class CoreAdminHandler extends Re
               live = clusterState.liveNodesContain(nodeName);
               
               String localState = cloudDescriptor.getLastPublished();
+
+              // TODO: This is funky but I've seen this in testing where the replica asks the
+              // leader to be in recovery? Need to track down how that happens ... in the meantime,
+              // this is a safeguard 
+              boolean leaderDoesNotNeedRecovery = (onlyIfLeader != null && 
+                  onlyIfLeader && 
+                  core.getName().equals(nodeProps.getStr("core")) &&
+                  ZkStateReader.RECOVERING.equals(waitForState) && 
+                  ZkStateReader.ACTIVE.equals(localState) && 
+                  ZkStateReader.ACTIVE.equals(state));
+              
+              if (leaderDoesNotNeedRecovery) {
+                log.warn("Leader "+core.getName()+" ignoring request to be in the recovering state because it is live and active.");
+              }              
               
               boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && (localState == null || !localState.equals(ZkStateReader.ACTIVE));
+              log.info("In WaitForState("+waitForState+"): collection="+collection+", shard="+slice.getName()+
+                  ", isLeader? "+core.getCoreDescriptor().getCloudDescriptor().isLeader()+
+                  ", live="+live+", currentState="+state+", localState="+localState+", nodeName="+nodeName+
+                  ", coreNodeName="+coreNodeName+", onlyIfActiveCheckResult="+onlyIfActiveCheckResult+", nodeProps: "+nodeProps);
 
-              if (!onlyIfActiveCheckResult && nodeProps != null && state.equals(waitForState)) {
+              if (!onlyIfActiveCheckResult && nodeProps != null && (state.equals(waitForState) || leaderDoesNotNeedRecovery)) {
                 if (checkLive == null) {
                   break;
                 } else if (checkLive && live) {
@@ -1040,7 +1059,7 @@ public class CoreAdminHandler extends Re
             collection = cloudDescriptor.getCollectionName();
             shardId = cloudDescriptor.getShardId();
             leaderInfo = coreContainer.getZkController().
-                getZkStateReader().getLeaderUrl(collection, shardId, 0);
+                getZkStateReader().getLeaderUrl(collection, shardId, 5000);
           } catch (Exception exc) {
             leaderInfo = "Not available due to: " + exc;
           }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1593312&r1=1593311&r2=1593312&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Thu May  8 15:48:26 2014
@@ -176,7 +176,7 @@ public class SolrCmdDistributor {
     for (Node node : nodes) {
       UpdateRequest uReq = new UpdateRequest();
       uReq.setParams(params);
-      uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+      uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);      
       submit(new Req(cmd.toString(), node, uReq, synchronous));
     }
     
@@ -230,6 +230,7 @@ public class SolrCmdDistributor {
           + req.node.getUrl() + " retry:"
           + req.retries + " " + req.cmdString + " params:" + req.uReq.getParams());
     }
+    
     try {
       SolrServer solrServer = servers.getSolrServer(req);
       NamedList<Object> rsp = solrServer.request(req.uReq);
@@ -258,6 +259,13 @@ public class SolrCmdDistributor {
       this.synchronous = synchronous;
       this.cmdString = cmdString;
     }
+    
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("SolrCmdDistributor$Req: cmd=").append(String.valueOf(cmdString));
+      sb.append("; node=").append(String.valueOf(node));
+      return sb.toString();
+    }
   }
     
 
@@ -272,6 +280,14 @@ public class SolrCmdDistributor {
     public Exception e;
     public int statusCode = -1;
     public Req req;
+    
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("SolrCmdDistributor$Error: statusCode=").append(statusCode);
+      sb.append("; exception=").append(String.valueOf(e));
+      sb.append("; req=").append(String.valueOf(req));
+      return sb.toString();
+    }
   }
   
   public static abstract class Node {
@@ -284,11 +300,27 @@ public class SolrCmdDistributor {
 
   public static class StdNode extends Node {
     protected ZkCoreNodeProps nodeProps;
+    protected String collection;
+    protected String shardId;
 
     public StdNode(ZkCoreNodeProps nodeProps) {
+      this(nodeProps, null, null);
+    }
+    
+    public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) {    
       this.nodeProps = nodeProps;
+      this.collection = collection;
+      this.shardId = shardId;
     }
     
+    public String getCollection() {
+      return collection;
+    }
+    
+    public String getShardId() {
+      return shardId;
+    }
+        
     @Override
     public String getUrl() {
       return nodeProps.getCoreUrl();
@@ -359,11 +391,9 @@ public class SolrCmdDistributor {
   public static class RetryNode extends StdNode {
     
     private ZkStateReader zkStateReader;
-    private String collection;
-    private String shardId;
     
     public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
-      super(nodeProps);
+      super(nodeProps, collection, shardId);
       this.zkStateReader = zkStateReader;
       this.collection = collection;
       this.shardId = shardId;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1593312&r1=1593311&r2=1593312&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Thu May  8 15:48:26 2014
@@ -20,6 +20,8 @@ package org.apache.solr.update.processor
 import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
 
 import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -33,12 +35,15 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.conn.ConnectTimeoutException;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRef;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.DistributedQueue;
+import org.apache.solr.cloud.LeaderInitiatedRecoveryThread;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
@@ -64,7 +69,9 @@ import org.apache.solr.common.params.Sol
 import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.Hash;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.component.RealTimeGetComponent;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestInfo;
@@ -160,7 +167,7 @@ public class DistributedUpdateProcessor 
   private List<Node> nodes;
 
   private UpdateCommand updateCommand;  // the current command this processor is working on.
-  
+    
   public DistributedUpdateProcessor(SolrQueryRequest req,
       SolrQueryResponse rsp, UpdateRequestProcessor next) {
     super(next);
@@ -213,7 +220,7 @@ public class DistributedUpdateProcessor 
       }
       String coreName = req.getCore().getName();
 
-      ClusterState cstate = zkController.getClusterState();
+      ClusterState cstate = zkController.getClusterState();      
       DocCollection coll = cstate.getCollection(collection);
       Slice slice = coll.getRouter().getTargetSlice(id, doc, req.getParams(), coll);
 
@@ -299,10 +306,10 @@ public class DistributedUpdateProcessor 
                 boolean skip = skipListSet.contains(props.getCoreUrl());
                 log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:" + skip);
                 if (!skip) {
-                  nodes.add(new StdNode(props));
+                    nodes.add(new StdNode(props, collection, shardId));
                 }
               } else {
-                nodes.add(new StdNode(props));
+                  nodes.add(new StdNode(props, collection, shardId));
               }
             }
           }
@@ -375,7 +382,7 @@ public class DistributedUpdateProcessor 
           if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName()))  {
             if (nodes == null) nodes = new ArrayList<>();
             ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader);
-            nodes.add(new StdNode(nodeProps));
+            nodes.add(new StdNode(nodeProps, coll.getName(), shardId));
           }
         }
       }
@@ -471,6 +478,7 @@ public class DistributedUpdateProcessor 
 
     String from = req.getParams().get(DISTRIB_FROM);
     ClusterState clusterState = zkController.getClusterState();
+        
     CloudDescriptor cloudDescriptor = req.getCore().getCoreDescriptor().getCloudDescriptor();
     Slice mySlice = clusterState.getSlice(collection, cloudDescriptor.getShardId());
     boolean localIsLeader = cloudDescriptor.isLeader();
@@ -528,7 +536,7 @@ public class DistributedUpdateProcessor 
       if (replicaProps != null) {
         nodes = new ArrayList<>(replicaProps.size());
         for (ZkCoreNodeProps props : replicaProps) {
-          nodes.add(new StdNode(props));
+          nodes.add(new StdNode(props, collection, shardId));
         }
       }
     } catch (InterruptedException e) {
@@ -553,7 +561,7 @@ public class DistributedUpdateProcessor 
     }
 
     boolean dropCmd = false;
-    if (!forwardToLeader) {
+    if (!forwardToLeader) {      
       dropCmd = versionAdd(cmd);
     }
 
@@ -582,6 +590,7 @@ public class DistributedUpdateProcessor 
         params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
         params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
             zkController.getBaseUrl(), req.getCore().getName()));
+        
         params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
         params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
         for (Node nodesByRoutingRule : nodesByRoutingRules) {
@@ -600,7 +609,6 @@ public class DistributedUpdateProcessor 
                   DistribPhase.TOLEADER.toString()));
       params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
           zkController.getBaseUrl(), req.getCore().getName()));
-
       cmdDistrib.distribAdd(cmd, nodes, params);
     }
     
@@ -655,46 +663,66 @@ public class DistributedUpdateProcessor 
     // legit
 
     for (final SolrCmdDistributor.Error error : errors) {
+      
       if (error.req.node instanceof RetryNode) {
         // we don't try to force a leader to recover
         // when we cannot forward to it
         continue;
       }
-      // TODO: we should force their state to recovering ??
-      // TODO: do retries??
-      // TODO: what if its is already recovering? Right now recoveries queue up -
-      // should they?
-      final String recoveryUrl = error.req.node.getBaseUrl();
+       
+      DistribPhase phase =
+          DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));       
+      if (phase != DistribPhase.FROMLEADER)
+        continue; // don't have non-leaders try to recovery other nodes
       
-      Thread thread = new Thread() {
-        {
-          setDaemon(true);
-        }
-        @Override
-        public void run() {
-          log.info("try and ask " + recoveryUrl + " to recover");
-          HttpSolrServer server = new HttpSolrServer(recoveryUrl);
-          try {
-            server.setSoTimeout(60000);
-            server.setConnectionTimeout(15000);
-            
-            RequestRecovery recoverRequestCmd = new RequestRecovery();
-            recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
-            recoverRequestCmd.setCoreName(error.req.node.getCoreName());
-            try {
-              server.request(recoverRequestCmd);
-            } catch (Throwable t) {
-              SolrException.log(log, recoveryUrl
-                  + ": Could not tell a replica to recover", t);
-            }
-          } finally {
-            server.shutdown();
-          }
+      final String replicaUrl = error.req.node.getUrl();      
+
+      int maxTries = 1;       
+      boolean sendRecoveryCommand = true;
+      String collection = null;
+      String shardId = null;
+      
+      if (error.req.node instanceof StdNode) {
+        StdNode stdNode = (StdNode)error.req.node;
+        collection = stdNode.getCollection();
+        shardId = stdNode.getShardId();
+        try {
+          // if false, then the node is probably not "live" anymore
+          sendRecoveryCommand = 
+              zkController.ensureReplicaInLeaderInitiatedRecovery(collection, 
+                                                                  shardId, 
+                                                                  replicaUrl, 
+                                                                  stdNode.getNodeProps(), 
+                                                                  false);
+          
+          // we want to try more than once, ~10 minutes
+          if (sendRecoveryCommand) {
+            maxTries = 120;
+          } // else the node is no longer "live" so no need to send any recovery command
+          
+        } catch (Exception e) {
+          log.error("Leader failed to set replica "+
+              error.req.node.getUrl()+" state to DOWN due to: "+e, e);
         }
-      };
-      ExecutorService executor = req.getCore().getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getUpdateExecutor();
-      executor.execute(thread);
+      } // else not a StdNode, recovery command still gets sent once
+            
+      if (!sendRecoveryCommand)
+        continue; // the replica is already in recovery handling or is not live   
+      
+      Throwable rootCause = SolrException.getRootCause(error.e);      
+      log.error("Setting up to try to start recovery on replica "+replicaUrl+" after: "+rootCause);
       
+      // try to send the recovery command to the downed replica in a background thread
+      CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();      
+      LeaderInitiatedRecoveryThread lirThread = 
+          new LeaderInitiatedRecoveryThread(zkController,
+                                            coreContainer,
+                                            collection,
+                                            shardId,
+                                            error.req.node.getNodeProps(),
+                                            maxTries);
+      ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor();
+      executor.execute(lirThread);      
     }
   }
 
@@ -1151,7 +1179,7 @@ public class DistributedUpdateProcessor 
           // don't forward to ourself
           leaderForAnyShard = true;
         } else {
-          leaders.add(new StdNode(coreLeaderProps));
+          leaders.add(new StdNode(coreLeaderProps, collection, sliceName));
         }
       }
 
@@ -1254,7 +1282,7 @@ public class DistributedUpdateProcessor 
           if (replicaProps != null) {
             List<Node> myReplicas = new ArrayList<>();
             for (ZkCoreNodeProps replicaProp : replicaProps) {
-              myReplicas.add(new StdNode(replicaProp));
+              myReplicas.add(new StdNode(replicaProp, collection, myShardId));
             }
             cmdDistrib.distribDelete(cmd, myReplicas, params);
             someReplicas = true;
@@ -1521,7 +1549,7 @@ public class DistributedUpdateProcessor 
       for (Entry<String,Replica> entry : shardMap.entrySet()) {
         ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
         if (clusterState.liveNodesContain(nodeProps.getNodeName())) {
-          urls.add(new StdNode(nodeProps));
+          urls.add(new StdNode(nodeProps, collection, replicas.getName()));
         }
       }
     }

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java?rev=1593312&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java Thu May  8 15:48:26 2014
@@ -0,0 +1,545 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.JSONTestUtil;
+import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simulates HTTP partitions between a leader and replica but the replica does
+ * not lose its ZooKeeper connection.
+ */
+@Slow
+@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
+public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
+  
+  private static final transient Logger log = 
+      LoggerFactory.getLogger(HttpPartitionTest.class);
+  
+  // To prevent the test assertions firing too fast before cluster state
+  // recognizes (and propagates) partitions
+  private static final long sleepMsBeforeHealPartition = 1000L;
+  
+  private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
+  private AtomicInteger portCounter = new AtomicInteger(0);
+  private int basePort = 49900;
+  
+  public HttpPartitionTest() {
+    super();
+    sliceCount = 2;
+    shardCount = 2;
+  }
+  
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    System.setProperty("numShards", Integer.toString(sliceCount));
+  }
+  
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    if (!proxies.isEmpty()) {
+      for (SocketProxy proxy : proxies.values()) {
+        proxy.close();
+      }
+    }
+    
+    System.clearProperty("numShards");
+    
+    try {
+      super.tearDown();
+    } catch (Exception exc) {}
+    
+    resetExceptionIgnores();
+  }
+  
+  /**
+   * Overrides the parent implementation so that we can configure a socket proxy
+   * to sit infront of each Jetty server, which gives us the ability to simulate
+   * network partitions without having to fuss with IPTables (which is not very
+   * cross platform friendly).
+   */
+  @Override
+  public JettySolrRunner createJetty(File solrHome, String dataDir,
+      String shardList, String solrConfigOverride, String schemaOverride)
+      throws Exception {
+    
+    int jettyPort = basePort + portCounter.incrementAndGet();
+    
+    JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
+        jettyPort, solrConfigOverride, schemaOverride, false,
+        getExtraServlets(), sslConfig, getExtraRequestFilters());
+    jetty.setShards(shardList);
+    jetty.setDataDir(getDataDir(dataDir));
+    
+    // setup to proxy Http requests to this server unless it is the control
+    // server
+    int proxyPort = basePort + portCounter.incrementAndGet();
+    jetty.setProxyPort(proxyPort);
+    
+    jetty.start();
+    
+    // create a socket proxy for the jetty server ...
+    SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
+    proxies.put(proxy.getUrl(), proxy);
+    
+    return jetty;
+  }
+   
+  @Override
+  public void doTest() throws Exception {
+    waitForThingsToLevelOut(30000);
+    
+    // test a 1x2 collection
+    testRf2();
+
+    // now do similar for a 1x3 collection while taking 2 replicas on-and-off
+    // each time
+    testRf3();
+
+    // kill a leader and make sure recovery occurs as expected
+    testRf3WithLeaderFailover();    
+  }
+  
+  protected void testRf2() throws Exception {
+    // create a collection that has 1 shard but 2 replicas
+    String testCollectionName = "c8n_1x2";
+    createCollection(testCollectionName, 1, 2, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+    
+    sendDoc(1);
+    
+    Replica notLeader = 
+        ensureAllReplicasAreActive(testCollectionName, 2, 10).get(0);
+    
+    // ok, now introduce a network partition between the leader and the replica
+    SocketProxy proxy = getProxyForReplica(notLeader);
+    
+    proxy.close();
+    
+    // indexing during a partition
+    sendDoc(2);
+    
+    // Have the partition last at least 1 sec
+    // While this gives the impression that recovery is timing related, this is
+    // really only
+    // to give time for the state to be written to ZK before the test completes.
+    // In other words,
+    // without a brief pause, the test finishes so quickly that it doesn't give
+    // time for the recovery process to kick-in
+    Thread.sleep(sleepMsBeforeHealPartition);
+    
+    proxy.reopen();
+    
+    List<Replica> notLeaders = 
+        ensureAllReplicasAreActive(testCollectionName, 2, 20); // shouldn't take 20 secs but just to be safe
+    
+    sendDoc(3);
+    
+    // sent 3 docs in so far, verify they are on the leader and replica
+    assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 3);
+        
+    // now up the stakes and do more docs
+    int numDocs = 1000;
+    boolean hasPartition = false;
+    for (int d = 0; d < numDocs; d++) {
+      // create / restore partition every 100 docs
+      if (d % 100 == 0) {
+        if (hasPartition) {
+          proxy.reopen();
+          hasPartition = false;
+        } else {
+          if (d >= 100) {
+            proxy.close();
+            hasPartition = true;
+            Thread.sleep(sleepMsBeforeHealPartition);
+          }
+        }
+      }
+      sendDoc(d + 4); // 4 is offset as we've already indexed 1-3
+    }
+    
+    // restore connectivity if lost
+    if (hasPartition) {
+      proxy.reopen();
+    }
+    
+    notLeaders = ensureAllReplicasAreActive(testCollectionName, 2, 20);
+    
+    // verify all docs received
+    assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, numDocs + 3);
+  }
+  
+  protected void testRf3() throws Exception {
+    // create a collection that has 1 shard but 2 replicas
+    String testCollectionName = "c8n_1x3";
+    createCollection(testCollectionName, 1, 3, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+    
+    sendDoc(1);
+    
+    List<Replica> notLeaders = 
+        ensureAllReplicasAreActive(testCollectionName, 3, 10);
+    assertTrue("Expected 2 replicas for collection " + testCollectionName
+        + " but found " + notLeaders.size() + "; clusterState: "
+        + cloudClient.getZkStateReader().getClusterState(),
+        notLeaders.size() == 2);
+    
+    sendDoc(1);
+    
+    // ok, now introduce a network partition between the leader and the replica
+    SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
+    
+    proxy0.close();
+    
+    // indexing during a partition
+    sendDoc(2);
+    
+    Thread.sleep(sleepMsBeforeHealPartition);
+    
+    proxy0.reopen();
+    
+    SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
+    
+    proxy1.close();
+    
+    sendDoc(3);
+    
+    Thread.sleep(sleepMsBeforeHealPartition);
+    proxy1.reopen();
+    
+    // sent 4 docs in so far, verify they are on the leader and replica
+    notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, 20); 
+    
+    sendDoc(4);
+    
+    assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);    
+  }
+  
+  protected void testRf3WithLeaderFailover() throws Exception {
+    // now let's create a partition in one of the replicas and outright
+    // kill the leader ... see what happens
+    // create a collection that has 1 shard but 2 replicas
+    String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails
+    createCollection(testCollectionName, 1, 3, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+    
+    sendDoc(1);
+    
+    List<Replica> notLeaders = 
+        ensureAllReplicasAreActive(testCollectionName, 3, 10);
+    assertTrue("Expected 2 replicas for collection " + testCollectionName
+        + " but found " + notLeaders.size() + "; clusterState: "
+        + cloudClient.getZkStateReader().getClusterState(),
+        notLeaders.size() == 2);
+        
+    sendDoc(1);
+    
+    // ok, now introduce a network partition between the leader and the replica
+    SocketProxy proxy0 = null;
+    proxy0 = getProxyForReplica(notLeaders.get(0));
+    
+    proxy0.close();
+    
+    // indexing during a partition
+    sendDoc(2);
+    
+    Thread.sleep(sleepMsBeforeHealPartition);
+    
+    proxy0.reopen();
+    
+    SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
+    
+    proxy1.close();
+    
+    sendDoc(3);
+    
+    Thread.sleep(sleepMsBeforeHealPartition);
+    proxy1.reopen();
+    
+    // sent 4 docs in so far, verify they are on the leader and replica
+    notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, 20); 
+    
+    sendDoc(4);
+    
+    assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);    
+        
+    Replica leader = 
+        cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    String leaderNode = leader.getNodeName();
+    assertNotNull("Could not find leader for shard1 of "+testCollectionName, leader);
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+    
+    // since maxShardsPerNode is 1, we're safe to kill the leader
+    notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, 20);    
+    proxy0 = getProxyForReplica(notLeaders.get(0));
+    proxy0.close();
+        
+    // indexing during a partition
+    // doc should be on leader and 1 replica
+    sendDoc(5);
+    
+    Thread.sleep(sleepMsBeforeHealPartition);
+    
+    String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
+
+    // kill the leader
+    leaderJetty.stop();
+    
+    if (leaderJetty.isRunning())
+      fail("Failed to stop the leader on "+leaderNode);
+        
+    SocketProxy oldLeaderProxy = getProxyForReplica(leader);
+    if (oldLeaderProxy != null) {
+      oldLeaderProxy.close();      
+    } else {
+      log.warn("No SocketProxy found for old leader node "+leaderNode);      
+    }
+
+    Thread.sleep(sleepMsBeforeHealPartition);
+    
+    Replica newLeader = 
+        cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 30000);
+        
+    assertNotNull("No new leader was elected after 30 seconds", newLeader);
+        
+    assertTrue("Expected node "+shouldNotBeNewLeaderNode+
+        " to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+
+        cloudClient.getZkStateReader().getClusterState(), 
+        !shouldNotBeNewLeaderNode.equals(newLeader.getNodeName()));
+    
+    proxy0.reopen();
+    
+    Thread.sleep(10000L);
+    
+    cloudClient.getZkStateReader().updateClusterState(true);
+    
+    List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
+    assertTrue("Expected 2 of 3 replicas to be active but only found "+activeReps.size()+"; "+activeReps, activeReps.size() == 2);
+        
+    sendDoc(6);
+    
+    assertDocsExistInAllReplicas(activeReps, testCollectionName, 1, 6);
+  }  
+  
+  protected List<Replica> getActiveOrRecoveringReplicas(String testCollectionName, String shardId) throws Exception {    
+    Map<String,Replica> activeReplicas = new HashMap<String,Replica>();    
+    ZkStateReader zkr = cloudClient.getZkStateReader();
+    ClusterState cs = zkr.getClusterState();
+    cs = zkr.getClusterState();
+    assertNotNull(cs);
+    for (Slice shard : cs.getActiveSlices(testCollectionName)) {
+      if (shard.getName().equals(shardId)) {
+        for (Replica replica : shard.getReplicas()) {
+          String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
+          if (ZkStateReader.ACTIVE.equals(replicaState) || ZkStateReader.RECOVERING.equals(replicaState)) {
+            activeReplicas.put(replica.getName(), replica);
+          }
+        }
+      }
+    }        
+    List<Replica> replicas = new ArrayList<Replica>();
+    replicas.addAll(activeReplicas.values());
+    return replicas;
+  }  
+  
+  protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
+    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+    assertNotNull(replicaBaseUrl);
+    URL baseUrl = new URL(replicaBaseUrl);
+    
+    SocketProxy proxy = proxies.get(baseUrl.toURI());
+    if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
+      baseUrl = new URL(baseUrl.toExternalForm() + "/");
+      proxy = proxies.get(baseUrl.toURI());
+    }
+    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+    return proxy;
+  }
+  
+  protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
+      String testCollectionName, int firstDocId, int lastDocId)
+      throws Exception {
+    Replica leader = 
+        cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
+    HttpSolrServer leaderSolr = getHttpSolrServer(leader, testCollectionName);
+    List<HttpSolrServer> replicas = 
+        new ArrayList<HttpSolrServer>(notLeaders.size());
+    
+    for (Replica r : notLeaders) {
+      replicas.add(getHttpSolrServer(r, testCollectionName));
+    }
+    try {
+      for (int d = firstDocId; d <= lastDocId; d++) {
+        String docId = String.valueOf(d);
+        assertDocExists(leaderSolr, testCollectionName, docId);
+        for (HttpSolrServer replicaSolr : replicas) {
+          assertDocExists(replicaSolr, testCollectionName, docId);
+        }
+      }
+    } finally {
+      if (leaderSolr != null) {
+        leaderSolr.shutdown();
+      }
+      for (HttpSolrServer replicaSolr : replicas) {
+        replicaSolr.shutdown();
+      }
+    }
+  }
+  
+  protected HttpSolrServer getHttpSolrServer(Replica replica, String coll) throws Exception {
+    ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
+    String url = zkProps.getBaseUrl() + "/" + coll;
+    return new HttpSolrServer(url);
+  }
+  
+  protected void sendDoc(int docId) throws Exception {
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField(id, String.valueOf(docId));
+    doc.addField("a_t", "hello" + docId);
+    cloudClient.add(doc);
+  }
+  
+  protected List<Replica> ensureAllReplicasAreActive(String testCollectionName, int rf, int maxWaitSecs) throws Exception {
+    long startMs = System.currentTimeMillis();
+    
+    Map<String,Replica> notLeaders = new HashMap<String,Replica>();
+    
+    ZkStateReader zkr = cloudClient.getZkStateReader();
+    ClusterState cs = zkr.getClusterState();
+    Collection<Slice> slices = cs.getActiveSlices(testCollectionName);
+    assertTrue(slices.size() == 1); // shards == 1
+    boolean allReplicasUp = false;
+    long waitMs = 0L;
+    long maxWaitMs = maxWaitSecs * 1000L;
+    Replica leader = null;
+    while (waitMs < maxWaitMs && !allReplicasUp) {
+      cs = zkr.getClusterState();
+      assertNotNull(cs);
+      for (Slice shard : cs.getActiveSlices(testCollectionName)) {
+        allReplicasUp = true; // assume true
+        Collection<Replica> replicas = shard.getReplicas();
+        assertTrue(replicas.size() == rf);
+        leader = shard.getLeader();
+        assertNotNull(leader);
+        
+        // ensure all replicas are "active" and identify the non-leader replica
+        for (Replica replica : replicas) {
+          String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
+          if (!ZkStateReader.ACTIVE.equals(replicaState)) {
+            log.info("Replica " + replica.getName() + " is currently " + replicaState);
+            allReplicasUp = false;
+          }
+          
+          if (!leader.equals(replica)) 
+            notLeaders.put(replica.getName(), replica);
+        }
+        
+        if (!allReplicasUp) {
+          try {
+            Thread.sleep(500L);
+          } catch (Exception ignoreMe) {}
+          waitMs += 500L;
+        }
+      }
+    } // end while
+    
+    if (!allReplicasUp) 
+      fail("Didn't see all replicas come up within " + maxWaitMs + " ms! ClusterState: " + cs);
+    
+    if (notLeaders.isEmpty()) 
+      fail("Didn't isolate any replicas that are not the leader! ClusterState: " + cs);
+    
+    long diffMs = (System.currentTimeMillis() - startMs);
+    log.info("Took " + diffMs + " ms to see all replicas become active.");
+    
+    List<Replica> replicas = new ArrayList<Replica>();
+    replicas.addAll(notLeaders.values());
+    return replicas;
+  }
+  
+  /**
+   * Query the real-time get handler for a specific doc by ID to verify it
+   * exists in the provided server.
+   */
+  @SuppressWarnings("rawtypes")
+  protected void assertDocExists(HttpSolrServer solr, String coll, String docId) throws Exception {
+    QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId));
+    NamedList rsp = solr.request(qr);
+    String match = 
+        JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
+    assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
+        + " due to: " + match, match == null);
+  }
+  
+  protected JettySolrRunner getJettyOnPort(int port) {
+    JettySolrRunner theJetty = null;
+    for (JettySolrRunner jetty : jettys) {
+      if (port == jetty.getLocalPort()) {
+        theJetty = jetty;
+        break;
+      }
+    }    
+    
+    if (theJetty == null) {
+      if (controlJetty.getLocalPort() == port) {
+        theJetty = controlJetty;
+      }
+    }
+    
+    if (theJetty == null)
+      fail("Not able to find JettySolrRunner for port: "+port);
+    
+    return theJetty;
+  }
+  
+  protected int getReplicaPort(Replica replica) {
+    String replicaNode = replica.getNodeName();    
+    String tmp = replicaNode.substring(replicaNode.indexOf(':')+1);
+    if (tmp.indexOf('_') != -1)
+      tmp = tmp.substring(0,tmp.indexOf('_'));
+    return Integer.parseInt(tmp);    
+  }  
+}



Mime
View raw message