lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r1237803 - in /lucene/dev/trunk/solr/core/src: java/org/apache/solr/cloud/Overseer.java test/org/apache/solr/cloud/OverseerTest.java
Date Mon, 30 Jan 2012 17:16:55 GMT
Author: markrmiller
Date: Mon Jan 30 17:16:54 2012
New Revision: 1237803

URL: http://svn.apache.org/viewvc?rev=1237803&view=rev
Log:
SOLR-3075: Overseer does not check cloudstate for previously assigned shardId but generates
a new one

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1237803&r1=1237802&r2=1237803&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon Jan 30 17:16:54
2012
@@ -176,20 +176,24 @@ public class Overseer implements NodeSta
         return false;
       }
       /**
-       * Try to assign core to the cluster
+       * Try to assign core to the cluster. 
        * @throws KeeperException 
        * @throws InterruptedException 
        */
       private CloudState updateState(CloudState state, String nodeName, CoreState coreState)
throws KeeperException, InterruptedException {
         String collection = coreState.getCollectionName();
         String zkCoreNodeName = coreState.getCoreNodeName();
-        
-          String shardId;
-          if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
-            shardId = AssignShard.assignShard(collection, state);
-          } else {
-            shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
-          }
+
+        // use the provided non null shardId
+        String shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
+        if(shardId==null) {
+          //use shardId from CloudState
+          shardId = getAssignedId(state, nodeName, coreState);
+        }
+        if(shardId==null) {
+          //request new shardId 
+          shardId = AssignShard.assignShard(collection, state);
+        }
           
           Map<String,String> props = new HashMap<String,String>();
           for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
@@ -209,6 +213,23 @@ public class Overseer implements NodeSta
           CloudState newCloudState = updateSlice(state, collection, slice);
           return newCloudState;
       }
+
+      /*
+       * Return an already assigned id or null if not assigned
+       */
+      private String getAssignedId(final CloudState state, final String nodeName,
+          final CoreState coreState) {
+        final String key = coreState.getProperties().get(ZkStateReader.NODE_NAME_PROP) +
"_" +  coreState.getProperties().get(ZkStateReader.CORE_NAME_PROP);
+        Map<String, Slice> slices = state.getSlices(coreState.getCollectionName());
+        if (slices != null) {
+          for (Slice slice : slices.values()) {
+            if (slice.getShards().get(key) != null) {
+              return slice.getName();
+            }
+          }
+        }
+        return null;
+      }
       
       private CloudState updateSlice(CloudState state, String collection, Slice slice) {
         
@@ -480,6 +501,7 @@ public class Overseer implements NodeSta
     Set<String> downNodes = complement(oldLiveNodes, liveNodes);
     for(String node: downNodes) {
       NodeStateWatcher watcher = nodeStateWatches.remove(node);
+      log.debug("Removed NodeStateWatcher for node:" + node);
     }
   }
   
@@ -491,6 +513,7 @@ public class Overseer implements NodeSta
         if (!nodeStateWatches.containsKey(nodeName)) {
           zkCmdExecutor.ensureExists(path, zkClient);
           nodeStateWatches.put(nodeName, new NodeStateWatcher(zkClient, nodeName, path, this));
+          log.debug("Added NodeStateWatcher for node " + nodeName);
         } else {
           log.debug("watch already added");
         }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1237803&r1=1237802&r2=1237803&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Mon Jan 30
17:16:54 2012
@@ -40,6 +40,7 @@ import org.apache.solr.core.CoreDescript
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -49,6 +50,54 @@ public class OverseerTest extends SolrTe
   private static final boolean DEBUG = false;
 
   
+  private static class MockZKController{
+    
+    private final SolrZkClient zkClient;
+    private final String nodeName;
+    
+    public MockZKController(String zkAddress, String nodeName) throws InterruptedException,
TimeoutException, IOException, KeeperException {
+      this.nodeName = nodeName;
+      zkClient = new SolrZkClient(zkAddress, TIMEOUT);
+      Overseer.createClientNodes(zkClient, nodeName);
+      
+      // live node
+      final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
+      zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
+    }
+
+    private void deleteNode(final String path) {
+      try {
+        Stat stat = zkClient.exists(path, null, false);
+        zkClient.delete(path, stat.getVersion(), false);
+      } catch (KeeperException e) {
+        fail("Unexpected KeeperException!" + e);
+      } catch (InterruptedException e) {
+        fail("Unexpected InterruptedException!" + e);
+      }
+    }
+
+    public void close(){
+      try {
+        deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1");
+        zkClient.close();
+      } catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+    
+    public void publishState(String coreName, String stateName) throws KeeperException, InterruptedException{
+      HashMap<String,String> coreProps = new HashMap<String,String>();
+      coreProps.put(ZkStateReader.STATE_PROP, stateName);
+      coreProps.put(ZkStateReader.NODE_NAME_PROP, nodeName);
+      coreProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+      CoreState state = new CoreState(coreName, "collection1", coreProps);
+      final String statePath = Overseer.STATES_NODE + "/" + nodeName;
+      zkClient.setData(statePath, ZkStateReader.toJSON(new CoreState[] {state}), true);
+    }
+    
+  }
+  
   @BeforeClass
   public static void beforeClass() throws Exception {
     initCore();
@@ -438,11 +487,11 @@ public class OverseerTest extends SolrTe
     SolrZkClient controllerClient = null;
     SolrZkClient overseerClient = null;
     ZkStateReader reader = null;
+    MockZKController mockController = null;
     
     try {
       server.run();
       controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
-      
       AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
       AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
       controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
@@ -450,45 +499,35 @@ public class OverseerTest extends SolrTe
       reader = new ZkStateReader(controllerClient);
       reader.createClusterStateWatchersAndUpdate();
 
-      Overseer.createClientNodes(controllerClient, "node1");
-      overseerClient = electNewOverseer(server.getZkAddress());
-      
-      // live node
-      final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
-      controllerClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
+      mockController = new MockZKController(server.getZkAddress(), "node1");
       
-      HashMap<String,String> coreProps = new HashMap<String,String>();
-      coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
-      coreProps.put(ZkStateReader.NODE_NAME_PROP, "node1");
-      CoreState state = new CoreState("core1", "collection1", coreProps);
-      
-      final String statePath = Overseer.STATES_NODE + "/node1";
-      // publish node state (recovering)
-      controllerClient.setData(statePath, ZkStateReader.toJSON(new CoreState[] {state}),
true);
+      overseerClient = electNewOverseer(server.getZkAddress());
+
+      mockController.publishState("core1", ZkStateReader.RECOVERING);
       
       // wait overseer assignment
       waitForSliceCount(reader, "collection1", 1);
       
       verifyStatus(reader, ZkStateReader.RECOVERING);
 
-      // publish node state (active)
-      coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
-      coreProps.put(ZkStateReader.SHARD_ID_PROP, "shard1");
-      state = new CoreState("core1", "collection1", coreProps);
-      controllerClient.setData(statePath,
-          ZkStateReader.toJSON(new CoreState[] {state}), true);
+      int version = getCloudStateVersion(controllerClient);
+      
+      mockController.publishState("core1", ZkStateReader.ACTIVE);
+      
+      while(version == getCloudStateVersion(controllerClient));
 
       verifyStatus(reader, ZkStateReader.ACTIVE);
+      version = getCloudStateVersion(controllerClient);
       overseerClient.close();
-      
-      coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
-      state = new CoreState("core1", "collection1", coreProps);
-             
-      controllerClient.setData(statePath,
-          ZkStateReader.toJSON(new CoreState[] {state}), true);
+      Thread.sleep(1000); //wait for overseer to get killed
 
-      overseerClient = electNewOverseer(server.getZkAddress());
+      mockController.publishState("core1", ZkStateReader.RECOVERING);
+      version = getCloudStateVersion(controllerClient);
       
+      overseerClient = electNewOverseer(server.getZkAddress());
+
+      while(version == getCloudStateVersion(controllerClient));
+
       verifyStatus(reader, ZkStateReader.RECOVERING);
       
       assertEquals("Live nodes count does not match", 1, reader.getCloudState()
@@ -497,6 +536,10 @@ public class OverseerTest extends SolrTe
           .getSlice("collection1", "shard1").getShards().size());      
     } finally {
       
+      if (mockController != null) {
+        mockController.close();
+      }
+      
       if (overseerClient != null) {
        overseerClient.close();
       }
@@ -509,6 +552,80 @@ public class OverseerTest extends SolrTe
       server.shutdown();
     }
   }
+  
+  @Test
+  public void testDoubleAssignment() throws Exception {
+    String zkDir = dataDir.getAbsolutePath() + File.separator
+        + "zookeeper/server1/data";
+    
+    System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "2");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+    
+    SolrZkClient controllerClient = null;
+    SolrZkClient overseerClient = null;
+    ZkStateReader reader = null;
+    MockZKController mockController = null;
+    
+    try {
+      server.run();
+      controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+      
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+      controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
+      
+      reader = new ZkStateReader(controllerClient);
+      reader.createClusterStateWatchersAndUpdate();
+
+      mockController = new MockZKController(server.getZkAddress(), "node1");
+      
+      overseerClient = electNewOverseer(server.getZkAddress());
+
+      mockController.publishState("core1", ZkStateReader.RECOVERING);
+
+      // wait overseer assignment
+      waitForSliceCount(reader, "collection1", 1);
+      
+      verifyStatus(reader, ZkStateReader.RECOVERING);
+
+      mockController.close();
+
+      int version = getCloudStateVersion(controllerClient);
+      
+      mockController = new MockZKController(server.getZkAddress(), "node1");
+      mockController.publishState("core1", ZkStateReader.RECOVERING);
+
+      while (version == getCloudStateVersion(controllerClient));
+      
+      reader.updateCloudState(true);
+      CloudState state = reader.getCloudState();
+      assertEquals("more than 1 shard id was assigned to same core", 1, state.getSlices("collection1").size());
+
+    } finally {
+      System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
+      if (overseerClient != null) {
+       overseerClient.close();
+      }
+      if (mockController != null) {
+        mockController.close();
+      }
+
+      if (controllerClient != null) {
+        controllerClient.close();
+      }
+      if (reader != null) {
+        reader.close();
+      }
+      server.shutdown();
+    }
+  }
+
+  private int getCloudStateVersion(SolrZkClient controllerClient)
+      throws KeeperException, InterruptedException {
+    return controllerClient.exists(ZkStateReader.CLUSTER_STATE, null, false).getVersion();
+  }
+
 
   private SolrZkClient electNewOverseer(String address) throws InterruptedException,
       TimeoutException, IOException, KeeperException {



Mime
View raw message