lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r1688396 - in /lucene/dev/trunk/solr: CHANGES.txt core/src/java/org/apache/solr/cloud/Assign.java core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
Date Tue, 30 Jun 2015 08:27:51 GMT
Author: shalin
Date: Tue Jun 30 08:27:50 2015
New Revision: 1688396

URL: http://svn.apache.org/r1688396
Log:
SOLR-7673: Race condition in shard splitting can cause operation to hang indefinitely or sub-shards
to never become active

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Assign.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1688396&r1=1688395&r2=1688396&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Tue Jun 30 08:27:50 2015
@@ -189,6 +189,9 @@ Bug Fixes
 * SOLR-6686: facet.threads can return wrong results when using facet.prefix multiple
   times on same field. (Michael Ryan, Tim Underwood via shalin)
 
+* SOLR-7673: Race condition in shard splitting can cause operation to hang indefinitely
+  or sub-shards to never become active. (shalin)
+
 Optimizations
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Assign.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Assign.java?rev=1688396&r1=1688395&r2=1688396&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Assign.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Assign.java Tue Jun 30 08:27:50
2015
@@ -17,6 +17,18 @@ package org.apache.solr.cloud;
  * the License.
  */
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.solr.cloud.rule.ReplicaAssigner;
 import org.apache.solr.cloud.rule.Rule;
 import org.apache.solr.common.SolrException;
@@ -30,20 +42,9 @@ import org.apache.solr.core.CoreContaine
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 
@@ -120,16 +121,34 @@ public class Assign {
     return returnShardId;
   }
 
-  static   class Node {
-    public  final String nodeName;
-    public int thisCollectionNodes=0;
-    public int totalNodes=0;
+  static String buildCoreName(DocCollection collection, String shard) {
+    Slice slice = collection.getSlice(shard);
+    int replicaNum = slice.getReplicas().size();
+    for (;;) {
+      String replicaName = collection.getName() + "_" + shard + "_replica" + replicaNum;
+      boolean exists = false;
+      for (Replica replica : slice.getReplicas()) {
+        if (replicaName.equals(replica.getStr(CORE_NAME_PROP))) {
+          exists = true;
+          break;
+        }
+      }
+      if (exists) replicaNum++;
+      else break;
+    }
+    return collection.getName() + "_" + shard + "_replica" + replicaNum;
+  }
+
+  static class Node {
+    public final String nodeName;
+    public int thisCollectionNodes = 0;
+    public int totalNodes = 0;
 
     Node(String nodeName) {
       this.nodeName = nodeName;
     }
 
-    public int weight(){
+    public int weight() {
       return (thisCollectionNodes * 100) + totalNodes;
     }
   }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1688396&r1=1688395&r2=1688396&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
(original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
Tue Jun 30 08:27:50 2015
@@ -150,6 +150,8 @@ public class OverseerCollectionProcessor
 
   public static final String ONLY_ACTIVE_NODES = "onlyactivenodes";
 
+  private static final String SKIP_CREATE_REPLICA_IN_CLUSTER_STATE = "skipCreateReplicaInClusterState";
+
   public int maxParallelThreads = 10;
 
   public static final Map<String, Object> COLL_PROPS = Collections.unmodifiableMap(makeMap(
@@ -1494,7 +1496,7 @@ public class OverseerCollectionProcessor
       collectShardResponses(results, false, null, shardHandler);
       
       final String asyncId = message.getStr(ASYNC);
-      HashMap<String,String> requestMap = new HashMap<String,String>();
+      HashMap<String,String> requestMap = new HashMap<>();
       
       for (int i = 0; i < subRanges.size(); i++) {
         String subSlice = subSlices.get(i);
@@ -1627,25 +1629,33 @@ public class OverseerCollectionProcessor
       List<String> nodeList = new ArrayList<>(nodes.size());
       nodeList.addAll(nodes);
       
-      Collections.shuffle(nodeList, RANDOM);
-      
       // TODO: Have maxShardsPerNode param for this operation?
       
       // Remove the node that hosts the parent shard for replica creation.
       nodeList.remove(nodeName);
       
       // TODO: change this to handle sharding a slice into > 2 sub-shards.
-      
+
+      List<Map<String, Object>> replicas = new ArrayList<>((repFactor -
1) * 2);
       for (int i = 1; i <= subSlices.size(); i++) {
         Collections.shuffle(nodeList, RANDOM);
         String sliceName = subSlices.get(i - 1);
         for (int j = 2; j <= repFactor; j++) {
           String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size());
           String shardName = collectionName + "_" + sliceName + "_replica" + (j);
-          
+
           log.info("Creating replica shard " + shardName + " as part of slice " + sliceName
+ " of collection "
               + collectionName + " on " + subShardNodeName);
-              
+
+          ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
+              ZkStateReader.COLLECTION_PROP, collectionName,
+              ZkStateReader.SHARD_ID_PROP, sliceName,
+              ZkStateReader.CORE_NAME_PROP, shardName,
+              ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+              ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
+              ZkStateReader.NODE_NAME_PROP, subShardNodeName);
+          Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
+
           HashMap<String,Object> propMap = new HashMap<>();
           propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
           propMap.put(COLLECTION_PROP, collectionName);
@@ -1662,34 +1672,17 @@ public class OverseerCollectionProcessor
           if (asyncId != null) {
             propMap.put(ASYNC, asyncId);
           }
-          addReplica(clusterState, new ZkNodeProps(propMap), results);
-          
-          String coreNodeName = waitForCoreNodeName(collectionName, subShardNodeName, shardName);
-          // wait for the replicas to be seen as active on sub shard leader
-          log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on:
" + subShardNodeName);
-          CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
-          cmd.setCoreName(subShardNames.get(i - 1));
-          cmd.setNodeName(subShardNodeName);
-          cmd.setCoreNodeName(coreNodeName);
-          cmd.setState(Replica.State.RECOVERING);
-          cmd.setCheckLive(true);
-          cmd.setOnlyIfLeader(true);
-          ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
-          
-          sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
-          
+          // special flag param to instruct addReplica not to create the replica in cluster
state again
+          propMap.put(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, "true");
+
+          replicas.add(propMap);
         }
       }
-      
-      collectShardResponses(results, true,
-          "SPLITSHARD failed to create subshard replicas or timed out waiting for them to
come up", shardHandler);
-          
-      completeAsyncRequest(asyncId, requestMap, results);
-      
-      log.info("Successfully created all replica shards for all sub-slices " + subSlices);
-      
-      commit(results, slice, parentShardLeader);
-      
+
+      // we must set the slice state into recovery before actually creating the replica cores
+      // this ensures that the logic inside Overseer to update sub-shard state to 'active'
+      // always gets a chance to execute. See SOLR-7673
+
       if (repFactor == 1) {
         // switch sub shard states to 'active'
         log.info("Replication factor is 1 so switching shard states");
@@ -1715,6 +1708,20 @@ public class OverseerCollectionProcessor
         ZkNodeProps m = new ZkNodeProps(propMap);
         inQueue.offer(ZkStateReader.toJSON(m));
       }
+
+      // now actually create replica cores on sub shard nodes
+      for (Map<String, Object> replica : replicas) {
+        addReplica(clusterState, new ZkNodeProps(replica), results);
+      }
+      
+      collectShardResponses(results, true,
+          "SPLITSHARD failed to create subshard replicas", shardHandler);
+          
+      completeAsyncRequest(asyncId, requestMap, results);
+      
+      log.info("Successfully created all replica shards for all sub-slices " + subSlices);
+      
+      commit(results, slice, parentShardLeader);
       
       return true;
     } catch (SolrException e) {
@@ -2456,8 +2463,7 @@ public class OverseerCollectionProcessor
     Map<String, Replica> result = new HashMap<>();
     long endTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
     while (true) {
-      DocCollection coll = zkStateReader.getClusterState().getCollection(
-          collectionName);
+      DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
       for (String coreName : coreNames) {
         if (result.containsKey(coreName)) continue;
         for (Slice slice : coll.getSlices()) {
@@ -2474,7 +2480,7 @@ public class OverseerCollectionProcessor
         return result;
       }
       if (System.nanoTime() > endTime) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas
in cluster state.");
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas:
" + coreNames + " in cluster state.");
       }
       
       Thread.sleep(100);
@@ -2511,23 +2517,8 @@ public class OverseerCollectionProcessor
       throw new SolrException(ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
     }
     if (coreName == null) {
-      // assign a name to this core
-      Slice slice = coll.getSlice(shard);
-      int replicaNum = slice.getReplicas().size();
-      for (;;) {
-        String replicaName = collection + "_" + shard + "_replica" + replicaNum;
-        boolean exists = false;
-        for (Replica replica : slice.getReplicas()) {
-          if (replicaName.equals(replica.getStr(CORE_NAME_PROP))) {
-            exists = true;
-            break;
-          }
-        }
-        if (exists) replicaNum++;
-        else break;
-      }
-      coreName = collection + "_" + shard + "_replica" + replicaNum;
-    } else {
+      coreName = Assign.buildCoreName(coll, shard);
+    } else if (!message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false)) {
       //Validate that the core name is unique in that collection
       for (Slice slice : coll.getSlices()) {
         for (Replica replica : slice.getReplicas()) {
@@ -2542,11 +2533,13 @@ public class OverseerCollectionProcessor
     ModifiableSolrParams params = new ModifiableSolrParams();
     
     if (!Overseer.isLegacy(zkStateReader.getClusterProps())) {
-      ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
ZkStateReader.COLLECTION_PROP,
-          collection, ZkStateReader.SHARD_ID_PROP, shard, ZkStateReader.CORE_NAME_PROP, coreName,
-          ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), ZkStateReader.BASE_URL_PROP,
-          zkStateReader.getBaseUrlForNodeName(node));
-      Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
+      if (!message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false)) {
+        ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
ZkStateReader.COLLECTION_PROP,
+            collection, ZkStateReader.SHARD_ID_PROP, shard, ZkStateReader.CORE_NAME_PROP,
coreName,
+            ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), ZkStateReader.BASE_URL_PROP,
+            zkStateReader.getBaseUrlForNodeName(node), ZkStateReader.NODE_NAME_PROP, node);
+        Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
+      }
       params.set(CoreAdminParams.CORE_NODE_NAME,
           waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
     }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java?rev=1688396&r1=1688395&r2=1688396&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java Tue
Jun 30 08:27:50 2015
@@ -72,7 +72,8 @@ public class SliceMutator {
         makeMap(
             ZkStateReader.CORE_NAME_PROP, message.getStr(ZkStateReader.CORE_NAME_PROP),
             ZkStateReader.BASE_URL_PROP, message.getStr(ZkStateReader.BASE_URL_PROP),
-            ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP)));
+            ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP),
+            ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP)));
     return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
   }
 



Mime
View raw message