lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r1176520 - in /lucene/dev/branches/solrcloud/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/core/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/common/cloud/ solrj/src/java/org/apache/zooke...
Date Tue, 27 Sep 2011 18:48:02 GMT
Author: markrmiller
Date: Tue Sep 27 18:48:01 2011
New Revision: 1176520

URL: http://svn.apache.org/viewvc?rev=1176520&view=rev
Log:
SOLR-2752: leader-per-shard experimentation

Added:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CurrentCoreDescriptorProvider.java   (with props)
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java   (with props)
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java   (with props)
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java   (with props)
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/zookeeper/
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/zookeeper/SolrZooKeeper.java   (with props)
Removed:
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
Modified:
    lucene/dev/branches/solrcloud/solr/build.xml
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java

Modified: lucene/dev/branches/solrcloud/solr/build.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/build.xml?rev=1176520&r1=1176519&r2=1176520&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/build.xml (original)
+++ lucene/dev/branches/solrcloud/solr/build.xml Tue Sep 27 18:48:01 2011
@@ -458,7 +458,7 @@
           <packageset dir="contrib/extraction/src/java"/>
           <packageset dir="contrib/uima/src/java"/>
           <group title="Core" packages="org.apache.*" />
-          <group title="SolrJ" packages="org.apache.solr.common.*,org.apache.solr.client.solrj*" />
+          <group title="SolrJ" packages="org.apache.solr.common.*,org.apache.solr.client.solrj.*,org.apache.zookeeper.*" />
           <group title="contrib: Clustering" packages="org.apache.solr.handler.clustering*" />
           <group title="contrib: DataImportHandler" packages="org.apache.solr.handler.dataimport*" />
           <group title="contrib: Solr Cell" packages="org.apache.solr.handler.extraction*" />

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CurrentCoreDescriptorProvider.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CurrentCoreDescriptorProvider.java?rev=1176520&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CurrentCoreDescriptorProvider.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CurrentCoreDescriptorProvider.java Tue Sep 27 18:48:01 2011
@@ -0,0 +1,29 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.List;
+
+import org.apache.solr.core.CoreDescriptor;
+
+/**
+ * Provide the current list of registered {@link CoreDescriptor}s.
+ */
+public abstract class CurrentCoreDescriptorProvider {
+  public abstract List<CoreDescriptor> getCurrentDescriptors();
+}

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java?rev=1176520&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java Tue Sep 27 18:48:01 2011
@@ -0,0 +1,301 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Per slice Leader Election process. This class contains the logic by which a
+ * leader shard for a slice is chosen. First call
+ * {@link #setupForSlice(String, String)} to ensure the election process is
+ * init'd for a new slice. Next call
+ * {@link #joinElection(String, String, String)} to start the leader election.
+ * 
+ * The implementation follows the classic ZooKeeper recipe of creating an
+ * ephemeral, sequential node for each shard and then looking at the set of such
+ * nodes - if the created node is the lowest sequential node, the shard that
+ * created the node is the leader. If not, the shard puts a watch on the next
+ * lowest node it finds, and if that node goes down, starts the whole process
+ * over by checking if it's the lowest sequential node, etc.
+ * 
+ */
+public class SliceLeaderElector {
+  private static Logger log = LoggerFactory.getLogger(SliceLeaderElector.class);
+  
+  private static final String LEADER_NODE = "/leader";
+  
+  private static final String ELECTION_NODE = "/election";
+  
+  private final static Pattern LEADER_SEQ = Pattern.compile(".*?/?n_(\\d+)");
+  
+  private SolrZkClient zkClient;
+  
+  public SliceLeaderElector(SolrZkClient zkClient) {
+    this.zkClient = zkClient;
+  }
+  
+  /**
+   * Check if the shard with the given n_* sequence number is the slice leader.
+   * If it is, set the leaderId on the slice leader zk node. If it is not, start
+   * watching the shard that is in line before this one - if it goes down, check
+   * if this shard is the leader again.
+   * 
+   * @param shardId
+   * @param collection
+   * @param seq
+   * @param leaderId
+   * @throws KeeperException
+   * @throws InterruptedException
+   * @throws UnsupportedEncodingException
+   */
+  private void checkIfIamLeader(final String shardId, final String collection,
+      final int seq, final String leaderId) throws KeeperException,
+      InterruptedException, UnsupportedEncodingException {
+    // get all other numbers...
+    String holdElectionPath = getElectionPath(shardId, collection)
+        + ELECTION_NODE;
+    List<String> seqs = zkClient.getChildren(holdElectionPath, null);
+    sortSeqs(seqs);
+    List<Integer> intSeqs = getSeqs(seqs);
+    if (seq <= intSeqs.get(0)) {
+      runIamLeaderProcess(shardId, collection, leaderId);
+    } else {
+      // I am not the leader - watch the node below me
+      int i = 1;
+      for (; i < intSeqs.size(); i++) {
+        int s = intSeqs.get(i);
+        if (seq < s) {
+          // we found who we come before - watch the guy in front
+          break;
+        }
+      }
+
+      try {
+        zkClient.getData(holdElectionPath + "/" + seqs.get(i - 2),
+            new Watcher() {
+              
+              @Override
+              public void process(WatchedEvent event) {
+                // am I the next leader?
+                try {
+                  checkIfIamLeader(shardId, collection, seq, leaderId);
+                } catch (UnsupportedEncodingException e) {
+                  log.error("", e);
+                  throw new ZooKeeperException(
+                      SolrException.ErrorCode.SERVER_ERROR, "", e);
+                } catch (KeeperException e) {
+                  log.error("", e);
+                  throw new ZooKeeperException(
+                      SolrException.ErrorCode.SERVER_ERROR, "", e);
+                } catch (InterruptedException e) {
+                  // Restore the interrupted status
+                  Thread.currentThread().interrupt();
+                  log.warn("", e);
+                  throw new ZooKeeperException(
+                      SolrException.ErrorCode.SERVER_ERROR, "", e);
+                }
+                
+              }
+            }, null);
+      } catch (KeeperException e) {
+        // we couldn't set our watch - the node before us may already be down?
+        // we need to check if we are the leader again
+        checkIfIamLeader(shardId, collection, seq, leaderId);
+      }
+    }
+  }
+
+  private void runIamLeaderProcess(final String shardId,
+      final String collection, final String leaderId) throws KeeperException,
+      InterruptedException, UnsupportedEncodingException {
+    String currentLeaderZkPath = getElectionPath(shardId, collection)
+        + LEADER_NODE;
+    zkClient.makePath(currentLeaderZkPath + "/" + leaderId,  CreateMode.EPHEMERAL);
+  }
+  
+  /**
+   * /collections/{collection}/leader_elect/{shard_id}/
+   * 
+   * @param shardId
+   * @param collection
+   * @return
+   */
+  private String getElectionPath(String shardId, String collection) {
+    return ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
+        + ZkStateReader.LEADER_ELECT_ZKNODE + "/" + shardId;
+  }
+  
+  /**
+   * Returns int given String of form n_0000000001 or n_0000000003, etc.
+   * 
+   * @param nStringSequence
+   * @return
+   */
+  private int getSeq(String nStringSequence) {
+    int seq = 0;
+    Matcher m = LEADER_SEQ.matcher(nStringSequence);
+    if (m.matches()) {
+      seq = Integer.parseInt(m.group(1));
+    } else {
+      throw new IllegalStateException("Could not find regex match in:"
+          + nStringSequence);
+    }
+    return seq;
+  }
+  
+  /**
+   * Returns int list given list of form n_0000000001, n_0000000003, etc.
+   * 
+   * @param seqs
+   * @return
+   */
+  private List<Integer> getSeqs(List<String> seqs) {
+    List<Integer> intSeqs = new ArrayList<Integer>(seqs.size());
+    for (String seq : seqs) {
+      intSeqs.add(getSeq(seq));
+    }
+    return intSeqs;
+  }
+  
+  /**
+   * Begin participating in the election process. Gets a new sequential number
+   * and begins watching the node with the sequence number before it, unless it
+   * is the lowest number, in which case, initiates the leader process. If the
+   * node that is watched goes down, check if we are the new lowest node, else
+   * watch the next lowest numbered node.
+   * 
+   * @param shardId
+   * @param collection
+   * @param shardZkNodeName
+   * @return sequential node number
+   * @throws KeeperException
+   * @throws InterruptedException
+   * @throws UnsupportedEncodingException
+   */
+  public int joinElection(String shardId, String collection,
+      String shardZkNodeName) throws KeeperException, InterruptedException,
+      UnsupportedEncodingException {
+    final String shardsElectZkPath = getElectionPath(shardId, collection)
+        + SliceLeaderElector.ELECTION_NODE;
+    
+    String leaderSeqPath = null;
+    boolean cont = true;
+    int tries = 0;
+    while (cont) {
+      try {
+        leaderSeqPath = zkClient.create(shardsElectZkPath + "/n_", null,
+            CreateMode.EPHEMERAL_SEQUENTIAL);
+        cont = false;
+      } catch (KeeperException.NoNodeException e) {
+        // we must have failed in creating the election node - someone else must
+        // be working on it, lets try again
+        if (tries++ > 9) {
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        }
+        cont = true;
+        Thread.sleep(50);
+      }
+    }
+    int seq = getSeq(leaderSeqPath);
+    checkIfIamLeader(shardId, collection, seq, shardZkNodeName);
+    
+    return seq;
+  }
+  
+  /**
+   * Set up any ZooKeeper nodes needed per shardId (slice) for leader election.
+   * 
+   * @param shardId
+   * @param collection
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public void setupForSlice(final String shardId, final String collection)
+      throws InterruptedException, KeeperException {
+    String shardsElectZkPath = getElectionPath(shardId, collection)
+        + SliceLeaderElector.ELECTION_NODE;
+    String currentLeaderZkPath = getElectionPath(shardId, collection)
+        + SliceLeaderElector.LEADER_NODE;
+    
+    try {
+      
+      // leader election node
+      if (!zkClient.exists(shardsElectZkPath)) {
+        
+        // make new leader election node
+        zkClient.makePath(shardsElectZkPath, CreateMode.PERSISTENT, null);
+        
+      }
+    } catch (KeeperException e) {
+      // its okay if another beats us creating the node
+      if (e.code() != KeeperException.Code.NODEEXISTS) {
+        throw e;
+      }
+    }
+    
+    try {
+      
+      // current leader node
+      if (!zkClient.exists(currentLeaderZkPath)) {
+        
+        // make new current leader node
+        zkClient.makePath(currentLeaderZkPath, CreateMode.PERSISTENT, null);
+        
+      }
+    } catch (KeeperException e) {
+      // its okay if another beats us creating the node
+      if (e.code() != KeeperException.Code.NODEEXISTS) {
+        throw e;
+      }
+    }
+  }
+  
+  /**
+   * Sort n string sequence list.
+   * 
+   * @param seqs
+   */
+  private void sortSeqs(List<String> seqs) {
+    Collections.sort(seqs, new Comparator<String>() {
+      
+      @Override
+      public int compare(String o1, String o2) {
+        return Integer.valueOf(getSeq(o1)).compareTo(
+            Integer.valueOf(getSeq(o2)));
+      }
+    });
+  }
+}

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1176520&r1=1176519&r2=1176520&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Tue Sep 27 18:48:01 2011
@@ -19,8 +19,10 @@ package org.apache.solr.cloud;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
@@ -34,6 +36,7 @@ import org.apache.solr.common.cloud.ZkNo
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.core.CoreDescriptor;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -61,7 +64,7 @@ public final class ZkController {
   private final static Pattern URL_POST = Pattern.compile("https?://(.*)");
   private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
 
-
+  
   // package private for tests
 
   static final String CONFIGS_ZKNODE = "/configs";
@@ -73,6 +76,8 @@ public final class ZkController {
   
   private ZkStateReader zkStateReader;
 
+  private SliceLeaderElector leaderElector;
+  
   private String zkServerAddress;
 
   private String localHostPort;
@@ -82,8 +87,11 @@ public final class ZkController {
 
   private String hostName;
 
+
+
   /**
-   * @param zkServerAddress ZooKeeper server host address
+   * @param coreContainer
+   * @param zkServerAddress
    * @param zkClientTimeout
    * @param zkClientConnectTimeout
    * @param localHost
@@ -94,8 +102,9 @@ public final class ZkController {
    * @throws IOException
    */
   public ZkController(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
-      String localHostContext) throws InterruptedException,
+      String localHostContext, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
       TimeoutException, IOException {
+ 
     this.zkServerAddress = zkServerAddress;
     this.localHostPort = locaHostPort;
     this.localHostContext = localHostContext;
@@ -107,10 +116,22 @@ public final class ZkController {
 
           public void command() {
             try {
+              // we need to create all of our lost watches
               zkStateReader.makeCollectionsNodeWatches();
               zkStateReader.makeShardsWatches(true);
               createEphemeralLiveNode();
               zkStateReader.updateCloudState(false);
+              
+              // re register all descriptors
+              List<CoreDescriptor> descriptors = registerOnReconnect
+                  .getCurrentDescriptors();
+              if (descriptors != null) {
+                for (CoreDescriptor descriptor : descriptors) {
+                  register(descriptor.getName(),
+                      descriptor.getCloudDescriptor());
+                }
+              }
+
             } catch (KeeperException e) {
               log.error("", e);
               throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -129,20 +150,27 @@ public final class ZkController {
 
           }
         });
+    
+    leaderElector = new SliceLeaderElector(zkClient);
     zkStateReader = new ZkStateReader(zkClient);
     init();
   }
 
   /**
+   * Adds the /collection/shards/shards_id node as well as the /collections/leader_elect/shards_id node.
+   * 
    * @param shardId
    * @param collection
    * @throws IOException
    * @throws InterruptedException 
    * @throws KeeperException 
    */
-  private void addZkShardsNode(String shardId, String collection) throws IOException, InterruptedException, KeeperException {
-
-    String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
+  private void addZkShardsNode(String shardId, String collection)
+      throws IOException, InterruptedException, KeeperException {
+    
+    String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
+        + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
+    
     
     try {
       
@@ -154,10 +182,6 @@ public final class ZkController {
         // makes shards zkNode if it doesn't exist
         zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
         
-        // TODO: consider how these notifications are being done
-        // ping that there is a new shardId
-        zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
-
       }
     } catch (KeeperException e) {
       // its okay if another beats us creating the node
@@ -165,7 +189,12 @@ public final class ZkController {
         throw e;
       }
     }
-
+    
+    leaderElector.setupForSlice(shardId, collection);
+    
+    // TODO: consider how these notifications are being done
+    // ping that there is a new shardId
+    zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[]) null);
   }
 
   /**
@@ -439,26 +468,21 @@ public final class ZkController {
    * 
    * @param coreName
    * @param cloudDesc
-   * @param forcePropsUpdate update solr.xml core props even if the shard is already registered
    * @throws IOException
    * @throws KeeperException
    * @throws InterruptedException
    */
-  public void register(String coreName, CloudDescriptor cloudDesc, boolean forcePropsUpdate) throws IOException,
+  public void register(String coreName, final CloudDescriptor cloudDesc) throws IOException,
       KeeperException, InterruptedException {
     String shardUrl = localHostName + ":" + localHostPort + "/" + localHostContext
         + "/" + coreName;
     
-    String collection = cloudDesc.getCollectionName();
+    final String collection = cloudDesc.getCollectionName();
     
     String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + cloudDesc.getShardId();
 
     boolean shardZkNodeAlreadyExists = zkClient.exists(shardsZkPath);
     
-    if(shardZkNodeAlreadyExists && !forcePropsUpdate) {
-      return;
-    }
-    
     if (log.isInfoEnabled()) {
       log.info("Register shard - core:" + coreName + " address:"
           + shardUrl);
@@ -473,7 +497,7 @@ public final class ZkController {
     
     String shardZkNodeName = getNodeName() + "_" + coreName;
 
-    if(shardZkNodeAlreadyExists && forcePropsUpdate) {
+    if(shardZkNodeAlreadyExists) {
       zkClient.setData(shardsZkPath + "/" + shardZkNodeName, bytes);
       // tell everyone to update cloud info
       zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
@@ -482,6 +506,7 @@ public final class ZkController {
       try {
         zkClient.create(shardsZkPath + "/" + shardZkNodeName, bytes,
             CreateMode.PERSISTENT);
+        
         // tell everyone to update cloud info
         zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
       } catch (KeeperException e) {
@@ -490,11 +515,21 @@ public final class ZkController {
           throw e;
         }
         // for some reason the shard already exists, though it didn't when we
-        // started registration - just return
-        return;
+        // started registration - just continue
+        
       }
     }
+    
+    // leader election
+    doLeaderElectionProcess(cloudDesc, collection, shardZkNodeName);
+    
+  }
 
+  private void doLeaderElectionProcess(final CloudDescriptor cloudDesc,
+      final String collection, String shardZkNodeName) throws KeeperException,
+      InterruptedException, UnsupportedEncodingException {
+   
+    leaderElector.joinElection(cloudDesc.getShardId(), collection, shardZkNodeName);
   }
 
   /**

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1176520&r1=1176519&r2=1176520&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java Tue Sep 27 18:48:01 2011
@@ -32,6 +32,7 @@ import javax.xml.xpath.XPath;
 import javax.xml.xpath.XPathExpressionException;
 
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.CurrentCoreDescriptorProvider;
 import org.apache.solr.cloud.SolrZkServer;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkSolrResourceLoader;
@@ -163,7 +164,17 @@ public class CoreContainer 
         } else {
           log.info("Zookeeper client=" + zookeeperHost);          
         }
-        zkController = new ZkController(zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext);
+        zkController = new ZkController(zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext, new CurrentCoreDescriptorProvider() {
+          
+          @Override
+          public List<CoreDescriptor> getCurrentDescriptors() {
+            List<CoreDescriptor> descriptors = new ArrayList<CoreDescriptor>(getCoreNames().size());
+            for (SolrCore core : getCores()) {
+              descriptors.add(core.getCoreDescriptor());
+            }
+            return descriptors;
+          }
+        });
         
         String confDir = System.getProperty("bootstrap_confdir");
         if(confDir != null) {
@@ -512,7 +523,7 @@ public class CoreContainer 
 
     if (zkController != null) {
       try {
-        zkController.register(core.getName(), core.getCoreDescriptor().getCloudDescriptor(), true);
+        zkController.register(core.getName(), core.getCoreDescriptor().getCloudDescriptor());
       } catch (InterruptedException e) {
         // Restore the interrupted status
         Thread.currentThread().interrupt();
@@ -995,7 +1006,7 @@ public class CoreContainer 
   private static final String DEF_SOLR_XML ="<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n" +
           "<solr persistent=\"false\">\n" +
           "  <cores adminPath=\"/admin/cores\" defaultCoreName=\"" + DEFAULT_DEFAULT_CORE_NAME + "\">\n" +
-          "    <core name=\""+ DEFAULT_DEFAULT_CORE_NAME + "\" instanceDir=\".\" />\n" +
+          "    <core name=\""+ DEFAULT_DEFAULT_CORE_NAME + "\" shard=\"${shard:}\" instanceDir=\".\" />\n" +
           "  </cores>\n" +
           "</solr>";
 }

Added: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java?rev=1176520&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java Tue Sep 27 18:48:01 2011
@@ -0,0 +1,249 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreContainer.Initializer;
+import org.apache.solr.core.SolrConfig;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+public class LeaderElectionIntegrationTest extends SolrTestCaseJ4 {
+  protected static Logger log = LoggerFactory
+      .getLogger(AbstractZkTestCase.class);
+  
+  private final static int NUM_SHARD_REPLICAS = 5;
+  
+  private static final boolean VERBOSE = false;
+  
+  private static final Pattern HOST = Pattern
+      .compile(".*?\\:(\\d\\d\\d\\d)_.*");
+  
+  protected ZkTestServer zkServer;
+  
+  protected String zkDir;
+  
+  private Map<Integer,CoreContainer> containerMap = new HashMap<Integer,CoreContainer>();
+  
+  private Map<String,Set<Integer>> shardPorts = new HashMap<String,Set<Integer>>();
+  
+  private SolrZkClient zkClient;
+  
+  @BeforeClass
+  public static void beforeClass() throws Exception {}
+  
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    createTempDir();
+    System.setProperty("zkClientTimeout", "3000");
+    
+    zkDir = dataDir.getAbsolutePath() + File.separator
+        + "zookeeper/server1/data";
+    zkServer = new ZkTestServer(zkDir);
+    zkServer.run();
+    System.setProperty("zkHost", zkServer.getZkAddress());
+    AbstractZkTestCase.buildZooKeeper(zkServer.getZkHost(),
+        zkServer.getZkAddress(), "solrconfig.xml", "schema.xml");
+    
+    log.info("####SETUP_START " + getName());
+    
+    // set some system properties for use by tests
+    System.setProperty("solr.test.sys.prop1", "propone");
+    System.setProperty("solr.test.sys.prop2", "proptwo");
+    
+    for (int i = 7000; i < 7000 + NUM_SHARD_REPLICAS; i++) {
+      setupContainer(i, "shard1");
+    }
+    
+    setupContainer(3333, "shard2");
+    
+    zkClient = new SolrZkClient(zkServer.getZkAddress(),
+        AbstractZkTestCase.TIMEOUT);
+    log.info("####SETUP_END " + getName());
+    
+  }
+  
+  private void setupContainer(int port, String shard) throws IOException,
+      ParserConfigurationException, SAXException {
+    File data = new File(dataDir + File.separator + "data_" + port);
+    data.mkdirs();
+    
+    System.setProperty("hostPort", Integer.toString(port));
+    System.setProperty("shard", shard);
+    Initializer init = new CoreContainer.Initializer();
+    System.setProperty("solr.data.dir", data.getAbsolutePath());
+    Set<Integer> ports = shardPorts.get(shard);
+    if (ports == null) {
+      ports = new HashSet<Integer>();
+      shardPorts.put(shard, ports);
+    }
+    ports.add(port);
+    CoreContainer container = init.initialize();
+    containerMap.put(port, container);
+    System.clearProperty("hostPort");
+  }
+  
+  @Test
+  public void testSimpleSliceLeaderElection() throws Exception {
+    
+    ZkNodeProps props2 = new ZkNodeProps();
+    props2.put("configName", "conf1");
+    //printLayout(zkServer.getZkAddress());
+    
+    for (int i = 0; i < 4; i++) {
+      // who is the leader?
+      String leader = getLeader();
+      
+      Set<Integer> shard1Ports = shardPorts.get("shard1");
+      
+      int leaderPort = getLeaderPort(leader);
+      assertTrue(shard1Ports.toString(), shard1Ports.contains(leaderPort));
+      
+      shard1Ports.remove(leaderPort);
+      
+      // kill the leader
+      System.out.println("Killing " + leaderPort);
+      containerMap.get(leaderPort).shutdown();
+      
+      //printLayout(zkServer.getZkAddress());
+      
+      // wait a sec for new leader to register
+      Thread.sleep(1000);
+      
+      leader = getLeader();
+      int newLeaderPort = getLeaderPort(leader);
+      
+      if (leaderPort == newLeaderPort) {
+        fail("We didn't find a new leader! " + leaderPort + " was shutdown, but it's still showing as the leader");
+      }
+      
+      assertTrue("Could not find leader " + newLeaderPort + " in " + shard1Ports, shard1Ports.contains(newLeaderPort));
+    }
+    
+
+  }
+  
+  @Test
+  public void testLeaderElectionAfterClientTimeout() throws Exception {
+    // TODO: work out the best timing here...
+    System.setProperty("zkClientTimeout", "500");
+    // timeout the leader
+    String leader = getLeader();
+    int leaderPort = getLeaderPort(leader);
+    containerMap.get(leaderPort).getZkController().getZkClient().getSolrZooKeeper().pauseCnxn(2000);
+    
+    Thread.sleep(4000);
+    
+    // first leader should not be leader anymore
+    assertNotSame(leaderPort, getLeaderPort(getLeader()));
+    
+    System.out.println("kill everyone");
+    // kill everyone but the first leader that should have reconnected by now
+    for (Map.Entry<Integer,CoreContainer> entry : containerMap.entrySet()) {
+      if (entry.getKey() != leaderPort) {
+        entry.getValue().shutdown();
+      }
+    }
+    
+    Thread.sleep(1000);
+    
+    // the original leader should be leader again now - everyone else is down
+    assertEquals(leaderPort, getLeaderPort(getLeader()));
+    //printLayout(zkServer.getZkAddress());
+    //Thread.sleep(100000);
+  }
+  
+  private String getLeader() throws Exception {
+    String leader = null;
+    int tries = 30;
+    while (true) {
+      List<String> leaderChildren = zkClient.getChildren(
+          "/collections/collection1/leader_elect/shard1/leader", null);
+      if (leaderChildren.size() > 0) {
+        assertEquals("There should only be one leader", 1,
+            leaderChildren.size());
+        leader = leaderChildren.get(0);
+        break;
+      } else {
+        if (tries-- == 0) {
+          printLayout(zkServer.getZkAddress());
+          fail("No registered leader was found");
+        }
+        Thread.sleep(1000);
+      }
+    }
+    return leader;
+  }
+  
+  private int getLeaderPort(String leader) {
+    Matcher m = HOST.matcher(leader);
+    int leaderPort = 0;
+    if (m.matches()) {
+      leaderPort = Integer.parseInt(m.group(1));
+      System.out.println("The leader is:" + Integer.parseInt(m.group(1)));
+    } else {
+      throw new IllegalStateException();
+    }
+    return leaderPort;
+  }
+  
+  @Override
+  public void tearDown() throws Exception {
+    if (VERBOSE) {
+      printLayout(zkServer.getZkHost());
+    }
+    zkClient.close();
+    
+    for (CoreContainer cc : containerMap.values()) {
+      cc.shutdown();
+    }
+    zkServer.shutdown();
+    super.tearDown();
+    System.clearProperty("zkClientTimeout");
+    System.clearProperty("zkHost");
+    System.clearProperty("hostPort");
+    System.clearProperty("shard");
+    System.clearProperty("CLOUD_UPDATE_DELAY");
+    SolrConfig.severeErrors.clear();
+  }
+  
+  private void printLayout(String zkHost) throws Exception {
+    SolrZkClient zkClient = new SolrZkClient(zkHost, AbstractZkTestCase.TIMEOUT);
+    zkClient.printLayoutToStdOut();
+    zkClient.close();
+  }
+}

Added: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java?rev=1176520&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java Tue Sep 27 18:48:01 2011
@@ -0,0 +1,282 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.core.SolrConfig;
+import org.apache.zookeeper.KeeperException;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class LeaderElectionTest extends SolrTestCaseJ4 {
+  
+  static final int TIMEOUT = 10000;
+  private ZkTestServer server;
+  private SolrZkClient zkClient;
+  
+  private Map<Integer,Thread> seqToThread;
+  
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    initCore();
+  }
+  
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    String zkDir = dataDir.getAbsolutePath() + File.separator
+        + "zookeeper/server1/data";
+    
+    server = new ZkTestServer(zkDir);
+    server.run();
+    AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+    AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+    zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+    seqToThread = new HashMap<Integer,Thread>();
+  }
+  
+  class ClientThread extends Thread {
+    SolrZkClient zkClient;
+    private int nodeNumber;
+    private int seq = -1;
+    
+    public ClientThread(int nodeNumber) throws Exception {
+      super("Thread-" + nodeNumber);
+      zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+      this.nodeNumber = nodeNumber;
+    }
+    
+    @Override
+    public void run() {
+      try {
+        SliceLeaderElector elector = new SliceLeaderElector(zkClient);
+        
+        elector.setupForSlice("shard1", "collection1");
+        seq = elector.joinElection("shard1", "collection1",
+            Integer.toString(nodeNumber));
+        seqToThread.put(seq, this);
+        // run forever - we will be explicitly killed
+        Thread.sleep(Integer.MAX_VALUE);
+      } catch (Throwable e) {
+
+      }
+    }
+    
+    public void close() throws InterruptedException {
+      zkClient.close();
+      super.stop();
+    }
+  }
+  
+  @Test
+  public void testElection() throws Exception {
+    // add a dummy slice, just for variance - call it shard2
+    
+    SolrZkClient zkClient1 = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+    
+    SliceLeaderElector elector = new SliceLeaderElector(zkClient1);
+    
+    elector.setupForSlice("shard2", "collection1");
+    elector.joinElection("shard2", "collection1", "dummynode1");
+    
+    SolrZkClient zkClient2 = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+    
+    SliceLeaderElector elector2 = new SliceLeaderElector(zkClient2);
+    
+    elector2.setupForSlice("shard2", "collection1");
+    elector2.joinElection("shard2", "collection1", "dummynode2");
+    
+    List<ClientThread> threads = new ArrayList<ClientThread>();
+    
+    for (int i = 0; i < 15; i++) {
+      ClientThread thread = new ClientThread(i);
+      
+      threads.add(thread);
+    }
+    
+    for (Thread thread : threads) {
+      thread.start();
+    }
+    
+    // make sure the leader node is there from the start
+    try {
+      zkClient.makePath("/collections/collection1/leader_elect/shard1/leader");
+    } catch (KeeperException.NodeExistsException e) {
+      // thats fine
+    }
+    
+    int leaderThread = Integer.parseInt(getLeader());
+    
+    // whoever the leader is, should be the n_0 seq
+    assertEquals(0, threads.get(leaderThread).seq);
+    
+    // kill n_0, 1, 3 and 4
+    ((ClientThread) seqToThread.get(0)).close();
+    ((ClientThread) seqToThread.get(4)).close();
+    ((ClientThread) seqToThread.get(1)).close();
+    ((ClientThread) seqToThread.get(3)).close();
+    
+    Thread.sleep(50);
+    
+    leaderThread = Integer.parseInt(getLeader());
+    
+    // whoever the leader is, should be the n_2 seq
+    assertEquals(2, threads.get(leaderThread).seq);
+    
+    // kill n_5, 2, 6, 7, and 8
+    ((ClientThread) seqToThread.get(5)).close();
+    ((ClientThread) seqToThread.get(2)).close();
+    ((ClientThread) seqToThread.get(6)).close();
+    ((ClientThread) seqToThread.get(7)).close();
+    ((ClientThread) seqToThread.get(8)).close();
+    
+    Thread.sleep(50);
+    
+    leaderThread = Integer.parseInt(getLeader());
+    
+    // whoever the leader is, should be the n_9 seq
+    assertEquals(9, threads.get(leaderThread).seq);
+    
+    // cleanup any threads still running
+    for (ClientThread thread : threads) {
+      thread.close();
+    }
+    
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    
+    //printLayout(server.getZkAddress());
+  }
+  
+  @Test
+  public void testStressElection() throws Exception {
+    final ScheduledExecutorService scheduler = Executors
+        .newScheduledThreadPool(100);
+    final List<ClientThread> threads = Collections
+        .synchronizedList(new ArrayList<ClientThread>());
+    
+    Thread scheduleThread = new Thread() {
+      @Override
+      public void run() {
+        for (int i = 0; i < 20; i++) {
+          int launchIn = random.nextInt(2000);
+          ClientThread thread;
+          try {
+            thread = new ClientThread(i);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          threads.add(thread);
+          scheduler.schedule(thread, launchIn, TimeUnit.MILLISECONDS);
+        }
+      }
+    };
+    
+    scheduleThread.start();
+    
+    Thread killThread = new Thread() {
+      @Override
+      public void run() {
+        
+        for (int i = 0; i < 1000; i++) {
+          try {
+            int j;
+            try {
+              j = random.nextInt(threads.size());
+            } catch(IllegalArgumentException e) {
+              continue;
+            }
+            try {
+              threads.get(j).close();
+            } catch (Exception e) {
+              
+            }
+            threads.remove(j);
+            Thread.sleep(10);
+            
+          } catch (Exception e) {
+
+          }
+        }
+      }
+    };
+    
+    killThread.start();
+    
+    scheduleThread.join();
+    killThread.join();
+    
+    Thread.sleep(1000);
+    
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+    //printLayout(server.getZkAddress());
+    
+  }
+  
+  private String getLeader() throws Exception {
+    
+    String leader = null;
+    int tries = 30;
+    while (true) {
+      List<String> leaderChildren = zkClient.getChildren(
+          "/collections/collection1/leader_elect/shard1/leader", null);
+      if (leaderChildren.size() > 0) {
+        assertEquals("There should only be one leader", 1,
+            leaderChildren.size());
+        leader = leaderChildren.get(0);
+        break;
+      } else {
+        if (tries-- == 0) {
+          printLayout(server.getZkAddress());
+          fail("No registered leader was found");
+        }
+        Thread.sleep(1000);
+      }
+    }
+    return leader;
+  }
+  
+  @Override
+  public void tearDown() throws Exception {
+    zkClient.close();
+    server.shutdown();
+    SolrConfig.severeErrors.clear();
+    super.tearDown();
+  }
+  
+  private void printLayout(String zkHost) throws Exception {
+    SolrZkClient zkClient = new SolrZkClient(zkHost, AbstractZkTestCase.TIMEOUT);
+    zkClient.printLayoutToStdOut();
+    zkClient.close();
+  }
+}

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1176520&r1=1176519&r2=1176520&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Tue Sep 27 18:48:01 2011
@@ -19,6 +19,7 @@ package org.apache.solr.cloud;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.solr.SolrTestCaseJ4;
@@ -27,6 +28,7 @@ import org.apache.solr.common.cloud.Slic
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrConfig;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -87,7 +89,14 @@ public class ZkControllerTest extends So
       }
 
       zkController = new ZkController(server.getZkAddress(),
-          TIMEOUT, 1000, "localhost", "8983", "solr");
+          TIMEOUT, 1000, "localhost", "8983", "solr", new CurrentCoreDescriptorProvider() {
+            
+            @Override
+            public List<CoreDescriptor> getCurrentDescriptors() {
+              // do nothing
+              return null;
+            }
+          });
  
       zkController.getZkStateReader().updateCloudState(true);
       CloudState cloudInfo = zkController.getCloudState();
@@ -159,7 +168,14 @@ public class ZkControllerTest extends So
       }
       zkClient.close();
       ZkController zkController = new ZkController(server.getZkAddress(), TIMEOUT, TIMEOUT,
-          "localhost", "8983", "/solr");
+          "localhost", "8983", "/solr", new CurrentCoreDescriptorProvider() {
+            
+            @Override
+            public List<CoreDescriptor> getCurrentDescriptors() {
+              // do nothing
+              return null;
+            }
+          });
       try {
         String configName = zkController.readConfigName(COLLECTION_NAME);
         assertEquals(configName, actualConfigName);
@@ -186,7 +202,14 @@ public class ZkControllerTest extends So
       AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
 
       zkController = new ZkController(server.getZkAddress(),
-          TIMEOUT, 10000, "localhost", "8983", "/solr");
+          TIMEOUT, 10000, "localhost", "8983", "/solr", new CurrentCoreDescriptorProvider() {
+            
+            @Override
+            public List<CoreDescriptor> getCurrentDescriptors() {
+              // do nothing
+              return null;
+            }
+          });
 
       zkController.uploadToZK(getFile("solr/conf"),
           ZkController.CONFIGS_ZKNODE + "/config1");

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java?rev=1176520&r1=1176519&r2=1176520&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java Tue Sep 27 18:48:01 2011
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.zookeeper.SolrZooKeeper;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java?rev=1176520&r1=1176519&r2=1176520&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java Tue Sep 27 18:48:01 2011
@@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExe
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.zookeeper.SolrZooKeeper;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1176520&r1=1176519&r2=1176520&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Tue Sep 27 18:48:01 2011
@@ -28,6 +28,7 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.SolrZooKeeper;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java?rev=1176520&r1=1176519&r2=1176520&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java Tue Sep 27 18:48:01 2011
@@ -20,6 +20,7 @@ package org.apache.solr.common.cloud;
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.zookeeper.SolrZooKeeper;
 import org.apache.zookeeper.Watcher;
 
 /**

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1176520&r1=1176519&r2=1176520&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Tue Sep 27 18:48:01 2011
@@ -50,6 +50,8 @@ public class ZkStateReader {
   
   private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
 
+  public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
+
   private static class ZKTF implements ThreadFactory {
     private static ThreadGroup tg = new ThreadGroup("ZkStateReader");
     @Override

Added: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/zookeeper/SolrZooKeeper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/zookeeper/SolrZooKeeper.java?rev=1176520&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/zookeeper/SolrZooKeeper.java (added)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/zookeeper/SolrZooKeeper.java Tue Sep 27 18:48:01 2011
@@ -0,0 +1,41 @@
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+// nocommit - we use this class to expose nasty stuff for tests
+public class SolrZooKeeper extends ZooKeeper {
+
+  public SolrZooKeeper(String connectString, int sessionTimeout,
+      Watcher watcher) throws IOException {
+    super(connectString, sessionTimeout, watcher);
+  }
+  
+  public ClientCnxn getConnection() {
+    return cnxn;
+  }
+  
+  /**
+   * Cause this ZooKeeper object to stop receiving from the ZooKeeperServer
+   * for the given number of milliseconds.
+   * @param ms the number of milliseconds to pause.
+   */
+  public void pauseCnxn(final long ms) {
+      new Thread() {
+          public void run() {
+              synchronized(cnxn) {
+                  try {
+                      try {
+                          ((SocketChannel)cnxn.sendThread.sockKey.channel()).socket().close();
+                      } catch (IOException e) {
+                          e.printStackTrace();
+                      }
+                      Thread.sleep(ms);
+                  } catch (InterruptedException e) {
+                  }
+              }
+          }
+      }.start();
+  }
+  
+}



Mime
View raw message