lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r1022188 [1/4] - in /lucene/dev/trunk/solr: ./ lib/ src/common/org/apache/solr/common/cloud/ src/common/org/apache/solr/common/params/ src/java/org/apache/solr/cloud/ src/java/org/apache/solr/core/ src/java/org/apache/solr/handler/admin/ sr...
Date Wed, 13 Oct 2010 17:01:33 GMT
Author: markrmiller
Date: Wed Oct 13 17:01:13 2010
New Revision: 1022188

URL: http://svn.apache.org/viewvc?rev=1022188&view=rev
Log:
SOLR-1873: SolrCloud - added shared/central config and core/shard managment via zookeeper, built-in load balancing, and infrastructure for future SolrCloud work.

Added:
    lucene/dev/trunk/solr/lib/log4j-over-slf4j-1.5.5.jar   (with props)
    lucene/dev/trunk/solr/lib/zookeeper-3.3.1.jar   (with props)
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/CloudState.java
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ConnectionManager.java
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/OnReconnect.java
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/Slice.java
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZkClient.java
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkNodeProps.java
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZooKeeperException.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/
    lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/CloudDescriptor.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/SolrZkServer.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
    lucene/dev/trunk/solr/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java
    lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/
    lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
    lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
    lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
    lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/BasicZkTest.java
    lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
    lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/ZkControllerTest.java
    lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/ZkNodePropsTest.java
    lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
    lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/ZkTestServer.java
    lucene/dev/trunk/solr/src/test/test-files/solr/solr.xml
    lucene/dev/trunk/solr/src/webapp/web/admin/zookeeper.jsp
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/src/common/org/apache/solr/common/params/CoreAdminParams.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreDescriptor.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrCore.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrResourceLoader.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryComponent.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryElevationComponent.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ResponseBuilder.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/SearchHandler.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardRequest.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardResponse.java
    lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/TermsComponent.java
    lucene/dev/trunk/solr/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
    lucene/dev/trunk/solr/src/test/org/apache/solr/BaseDistributedSearchTestCase.java
    lucene/dev/trunk/solr/src/test/org/apache/solr/handler/component/DistributedTermsComponentTest.java
    lucene/dev/trunk/solr/src/webapp/src/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
    lucene/dev/trunk/solr/src/webapp/web/admin/index.jsp

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Oct 13 17:01:13 2010
@@ -291,6 +291,10 @@ New Features
 
 * SOLR-2010: Added ability to verify that spell checking collations have
    actual results in the index.  (James Dyer via gsingers)
+   
+* SOLR-1873: SolrCloud - added shared/central config and core/shard managment via zookeeper,
+  built-in load balancing, and infrastructure for future SolrCloud work. 
+  (yonik, Mark Miller)
 
 Optimizations
 ----------------------

Added: lucene/dev/trunk/solr/lib/log4j-over-slf4j-1.5.5.jar
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/lib/log4j-over-slf4j-1.5.5.jar?rev=1022188&view=auto
==============================================================================
Binary file - no diff available.

Propchange: lucene/dev/trunk/solr/lib/log4j-over-slf4j-1.5.5.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: lucene/dev/trunk/solr/lib/zookeeper-3.3.1.jar
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/lib/zookeeper-3.3.1.jar?rev=1022188&view=auto
==============================================================================
Binary file - no diff available.

Propchange: lucene/dev/trunk/solr/lib/zookeeper-3.3.1.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/CloudState.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/CloudState.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/CloudState.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,144 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// immutable
+public class CloudState {
+  protected static Logger log = LoggerFactory.getLogger(CloudState.class);
+  
+  private final Map<String,Map<String,Slice>> collectionStates;
+  private final Set<String> liveNodes;
+  
+  public CloudState(Set<String> liveNodes, Map<String,Map<String,Slice>> collectionStates) {
+    this.liveNodes = liveNodes;
+    this.collectionStates = collectionStates;
+  }
+  
+  public Map<String,Slice> getSlices(String collection) {
+    Map<String,Slice> collectionState = collectionStates.get(collection);
+    if(collectionState == null) {
+      return null;
+    }
+    return Collections.unmodifiableMap(collectionState);
+  }
+  
+  public Set<String> getCollections() {
+    return Collections.unmodifiableSet(collectionStates.keySet());
+  }
+  
+  public Map<String,Map<String,Slice>> getCollectionStates() {
+    return Collections.unmodifiableMap(collectionStates);
+  }
+  
+  public Set<String> getLiveNodes() {
+    return Collections.unmodifiableSet(liveNodes);
+  }
+  
+  public boolean liveNodesContain(String name) {
+    return liveNodes.contains(name);
+  }
+  
+  public static CloudState buildCloudState(SolrZkClient zkClient, CloudState oldCloudState, boolean onlyLiveNodes) throws KeeperException, InterruptedException, IOException {
+    Map<String,Map<String,Slice>> collectionStates;
+    if (!onlyLiveNodes) {
+      List<String> collections = zkClient.getChildren(
+          ZkStateReader.COLLECTIONS_ZKNODE, null);
+
+      collectionStates = new HashMap<String,Map<String,Slice>>();
+      for (String collection : collections) {
+        String shardIdPaths = ZkStateReader.COLLECTIONS_ZKNODE + "/"
+            + collection + ZkStateReader.SHARDS_ZKNODE;
+        List<String> shardIdNames;
+        try {
+          shardIdNames = zkClient.getChildren(shardIdPaths, null);
+        } catch (KeeperException.NoNodeException e) {
+          // node is not valid currently
+          continue;
+        }
+        Map<String,Slice> slices = new HashMap<String,Slice>();
+        for (String shardIdZkPath : shardIdNames) {
+          Map<String,ZkNodeProps> shardsMap = readShards(zkClient, shardIdPaths
+              + "/" + shardIdZkPath);
+          Slice slice = new Slice(shardIdZkPath, shardsMap);
+          slices.put(shardIdZkPath, slice);
+        }
+        collectionStates.put(collection, slices);
+      }
+    } else {
+      collectionStates = oldCloudState.getCollectionStates();
+    }
+    
+    CloudState cloudInfo = new CloudState(getLiveNodes(zkClient), collectionStates);
+    
+    return cloudInfo;
+  }
+  
+  /**
+   * @param zkClient
+   * @param shardsZkPath
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  private static Map<String,ZkNodeProps> readShards(SolrZkClient zkClient, String shardsZkPath)
+      throws KeeperException, InterruptedException, IOException {
+
+    Map<String,ZkNodeProps> shardNameToProps = new HashMap<String,ZkNodeProps>();
+
+    if (zkClient.exists(shardsZkPath, null) == null) {
+      throw new IllegalStateException("Cannot find zk shards node that should exist:"
+          + shardsZkPath);
+    }
+
+    List<String> shardZkPaths = zkClient.getChildren(shardsZkPath, null);
+    
+    for(String shardPath : shardZkPaths) {
+      byte[] data = zkClient.getData(shardsZkPath + "/" + shardPath, null,
+          null);
+      
+      ZkNodeProps props = new ZkNodeProps();
+      props.load(data);
+      shardNameToProps.put(shardPath, props);
+    }
+
+    return Collections.unmodifiableMap(shardNameToProps);
+  }
+  
+  private static Set<String> getLiveNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
+    List<String> liveNodes = zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null);
+    Set<String> liveNodesSet = new HashSet<String>(liveNodes.size());
+    liveNodesSet.addAll(liveNodes);
+
+    return liveNodesSet;
+  }
+
+}

Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ConnectionManager.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ConnectionManager.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ConnectionManager.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,140 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ConnectionManager implements Watcher {
+  protected static final Logger log = LoggerFactory
+      .getLogger(ConnectionManager.class);
+
+  private final String name;
+  private CountDownLatch clientConnected;
+  private KeeperState state;
+  private boolean connected;
+
+  private ZkClientConnectionStrategy connectionStrategy;
+
+  private String zkServerAddress;
+
+  private int zkClientTimeout;
+
+  private SolrZkClient client;
+
+  private OnReconnect onReconnect;
+
+  public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect) {
+    this.name = name;
+    this.client = client;
+    this.connectionStrategy = strat;
+    this.zkServerAddress = zkServerAddress;
+    this.zkClientTimeout = zkClientTimeout;
+    this.onReconnect = onConnect;
+    reset();
+  }
+
+  private synchronized void reset() {
+    clientConnected = new CountDownLatch(1);
+    state = KeeperState.Disconnected;
+    connected = false;
+  }
+
+  public synchronized void process(WatchedEvent event) {
+    if (log.isInfoEnabled()) {
+      log.info("Watcher " + this + " name:" + name + " got event " + event
+          + " path:" + event.getPath() + " type:" + event.getType());
+    }
+
+    state = event.getState();
+    if (state == KeeperState.SyncConnected) {
+      connected = true;
+      clientConnected.countDown();
+    } else if (state == KeeperState.Expired) {
+      
+      connected = false;
+      log.info("Attempting to reconnect to ZooKeeper...");
+
+      try {
+        connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate() {
+          @Override
+          public void update(SolrZooKeeper keeper) throws InterruptedException, TimeoutException, IOException {
+           waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
+           client.updateKeeper(keeper);
+           if(onReconnect != null) {
+             onReconnect.command();
+           }
+           ConnectionManager.this.connected = true;
+          }
+        });
+      } catch (Exception e) {
+        log.error("", e);
+      }
+
+      log.info("Connected:" + connected);
+    } else if (state == KeeperState.Disconnected) {
+      // ZooKeeper client will recover when it can
+      // TODO: this needs to be investigated more
+      connected = false;
+    } else {
+      connected = false;
+    }
+    notifyAll();
+  }
+
+  public synchronized boolean isConnected() {
+    return connected;
+  }
+
+  public synchronized KeeperState state() {
+    return state;
+  }
+
+  public synchronized void waitForConnected(long waitForConnection)
+      throws InterruptedException, TimeoutException, IOException {
+    long expire = System.currentTimeMillis() + waitForConnection;
+    long left = waitForConnection;
+    while (!connected && left > 0) {
+      wait(left);
+      left = expire - System.currentTimeMillis();
+    }
+    if (!connected) {
+      throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
+    }
+  }
+
+  public synchronized void waitForDisconnected(long timeout)
+      throws InterruptedException, TimeoutException {
+    long expire = System.currentTimeMillis() + timeout;
+    long left = timeout;
+    while (connected && left > 0) {
+      wait(left);
+      left = expire - System.currentTimeMillis();
+    }
+    if (connected) {
+      throw new TimeoutException("Did not disconnect");
+    }
+  }
+}

Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,74 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO: improve backoff retry impl
+ */
+public class DefaultConnectionStrategy extends ZkClientConnectionStrategy {
+
+  private static Logger log = LoggerFactory.getLogger(DefaultConnectionStrategy.class);
+  private ScheduledExecutorService executor;
+  
+  @Override
+  public void connect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
+    updater.update(new SolrZooKeeper(serverAddress, timeout, watcher));
+  }
+
+  @Override
+  public void reconnect(final String serverAddress, final int zkClientTimeout,
+      final Watcher watcher, final ZkUpdate updater) throws IOException {
+    log.info("Starting reconnect to ZooKeeper attempts ...");
+    executor = Executors.newScheduledThreadPool(1);
+    executor.schedule(new Runnable() {
+      private int delay = 1000;
+      public void run() {
+        log.info("Attempting the connect...");
+        boolean connected = false;
+        try {
+          updater.update(new SolrZooKeeper(serverAddress, zkClientTimeout, watcher));
+          log.info("Reconnected to ZooKeeper");
+          connected = true;
+        } catch (Exception e) {
+          log.error("", e);
+          log.info("Reconnect to ZooKeeper failed");
+        }
+        if(connected) {
+          executor.shutdownNow();
+        } else {
+          if(delay < 240000) {
+            delay = delay * 2;
+          }
+          executor.schedule(this, delay, TimeUnit.MILLISECONDS);
+        }
+        
+      }
+    }, 1000, TimeUnit.MILLISECONDS);
+  }
+
+}

Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/OnReconnect.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/OnReconnect.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/OnReconnect.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/OnReconnect.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,22 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public interface OnReconnect {
+  public void command();
+}

Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/Slice.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/Slice.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/Slice.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/Slice.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,41 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+
+
+// immutable
+public class Slice {
+  private final Map<String,ZkNodeProps> shards;
+  private final String name;
+
+  public Slice(String name, Map<String,ZkNodeProps> shards) {
+    this.shards = shards;
+    this.name = name;
+  }
+  
+  public Map<String,ZkNodeProps> getShards() {
+    return Collections.unmodifiableMap(shards);
+  }
+  
+  public String getName() {
+    return name;
+  }
+}

Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZkClient.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZkClient.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZkClient.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,493 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * All Solr ZooKeeper interactions should go through this class rather than
+ * ZooKeeper. This class handles synchronous connects and reconnections.
+ *
+ */
+public class SolrZkClient {
+  static final String NEWL = System.getProperty("line.separator");
+
+  static final int DEFAULT_CLIENT_CONNECT_TIMEOUT = 30000;
+
+  private static final Logger log = LoggerFactory
+      .getLogger(SolrZkClient.class);
+
+  private ConnectionManager connManager;
+
+  private volatile SolrZooKeeper keeper;
+  
+  /**
+   * @param zkServerAddress
+   * @param zkClientTimeout
+   * @throws InterruptedException
+   * @throws TimeoutException
+   * @throws IOException
+   */
+  public SolrZkClient(String zkServerAddress, int zkClientTimeout) throws InterruptedException, TimeoutException, IOException {
+    this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null);
+  }
+  
+  public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, OnReconnect onReonnect) throws InterruptedException, TimeoutException, IOException {
+    this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), onReonnect, zkClientConnectTimeout);
+  }
+
+  /**
+   * @param zkServerAddress
+   * @param zkClientTimeout
+   * @param strat
+   * @param onReconnect
+   * @param clientConnectTimeout
+   * @throws InterruptedException
+   * @throws TimeoutException
+   * @throws IOException
+   */
+  public SolrZkClient(String zkServerAddress, int zkClientTimeout,
+      ZkClientConnectionStrategy strat, final OnReconnect onReconnect) throws InterruptedException,
+      TimeoutException, IOException {
+    this(zkServerAddress, zkClientTimeout, strat, onReconnect, DEFAULT_CLIENT_CONNECT_TIMEOUT);
+  }
+
+  /**
+   * @param zkServerAddress
+   * @param zkClientTimeout
+   * @param strat
+   * @param onReconnect
+   * @param clientConnectTimeout
+   * @throws InterruptedException
+   * @throws TimeoutException
+   * @throws IOException
+   */
+  public SolrZkClient(String zkServerAddress, int zkClientTimeout,
+      ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) throws InterruptedException,
+      TimeoutException, IOException {
+    connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+        + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
+    strat.connect(zkServerAddress, zkClientTimeout, connManager,
+        new ZkUpdate() {
+          @Override
+          public void update(SolrZooKeeper zooKeeper) {
+            SolrZooKeeper oldKeeper = keeper;
+            keeper = zooKeeper;
+            if (oldKeeper != null) {
+              try {
+                oldKeeper.close();
+              } catch (InterruptedException e) {
+                // Restore the interrupted status
+                Thread.currentThread().interrupt();
+                log.error("", e);
+                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                    "", e);
+              }
+            }
+          }
+        });
+    connManager.waitForConnected(clientConnectTimeout);
+  }
+
+  /**
+   * @return true if client is connected
+   */
+  public boolean isConnected() {
+    return keeper != null && keeper.getState() == ZooKeeper.States.CONNECTED;
+  }
+  
+  /**
+   * @param path
+   * @param version
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public void delete(final String path, int version)
+      throws InterruptedException, KeeperException {
+    keeper.delete(path, version);
+  }
+
+  /**
+   * Return the stat of the node of the given path. Return null if no such a
+   * node exists.
+   * <p>
+   * If the watch is non-null and the call is successful (no exception is thrown),
+   * a watch will be left on the node with the given path. The watch will be
+   * triggered by a successful operation that creates/delete the node or sets
+   * the data on the node.
+   *
+   * @param path the node path
+   * @param watcher explicit watcher
+   * @return the stat of the node of the given path; return null if no such a
+   *         node exists.
+   * @throws KeeperException If the server signals an error
+   * @throws InterruptedException If the server transaction is interrupted.
+   * @throws IllegalArgumentException if an invalid path is specified
+   */
+  public Stat exists(final String path, Watcher watcher)
+      throws KeeperException, InterruptedException {
+    return keeper.exists(path, watcher);
+  }
+  
+  /**
+   * @param path
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public boolean exists(final String path)
+      throws KeeperException, InterruptedException {
+    return keeper.exists(path, null) != null;
+  }
+
+  /**
+   * @param path
+   * @param data
+   * @param acl
+   * @param createMode
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public String create(final String path, byte data[], List<ACL> acl,
+      CreateMode createMode) throws KeeperException, InterruptedException {
+    return keeper.create(path, data, acl, createMode);
+  }
+
+  /**
+   * @param path
+   * @param watcher
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public List<String> getChildren(final String path, Watcher watcher)
+      throws KeeperException, InterruptedException {
+    return keeper.getChildren(path, watcher);
+  }
+
+  /**
+   * @param path
+   * @param watcher
+   * @param stat
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public byte[] getData(final String path, Watcher watcher, Stat stat)
+      throws KeeperException, InterruptedException {
+    return keeper.getData(path, watcher, stat);
+  }
+
+  /**
+   * @param path
+   * @param data
+   * @param version
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public Stat setData(final String path, byte data[], int version)
+      throws KeeperException, InterruptedException {
+    return keeper.setData(path, data, version);
+  }
+  
+  /**
+   * 
+   * @param path
+   * @param data
+   * @param watcher
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public String create(String path, byte[] data, CreateMode createMode) throws KeeperException, InterruptedException {
+
+    String zkPath = keeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
+
+    return zkPath;
+  }
+
+  /**
+   * Creates the path in ZooKeeper, creating each node as necessary.
+   * 
+   * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+   * group, node exist, each will be created.
+   * 
+   * @param path
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void makePath(String path) throws KeeperException,
+      InterruptedException {
+    makePath(path, null, CreateMode.PERSISTENT);
+  }
+  
+  public void makePath(String path, CreateMode createMode) throws KeeperException,
+      InterruptedException {
+    makePath(path, null, createMode);
+  }
+
+  /**
+   * Creates the path in ZooKeeper, creating each node as necessary.
+   * 
+   * @param path
+   * @param data to set on the last zkNode
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void makePath(String path, byte[] data) throws KeeperException,
+      InterruptedException {
+    makePath(path, data, CreateMode.PERSISTENT);
+  }
+
+  /**
+   * Creates the path in ZooKeeper, creating each node as necessary.
+   * 
+   * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+   * group, node exist, each will be created.
+   * 
+   * @param path
+   * @param data to set on the last zkNode
+   * @param createMode
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void makePath(String path, byte[] data, CreateMode createMode)
+      throws KeeperException, InterruptedException {
+    makePath(path, data, createMode, null);
+  }
+
+  /**
+   * Creates the path in ZooKeeper, creating each node as necessary.
+   * 
+   * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+   * group, node exist, each will be created.
+   * 
+   * @param path
+   * @param data to set on the last zkNode
+   * @param createMode
+   * @param watcher
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void makePath(String path, byte[] data, CreateMode createMode,
+      Watcher watcher) throws KeeperException, InterruptedException {
+    makePath(path, data, createMode, watcher, false);
+  }
+  
+  /**
+   * Creates the path in ZooKeeper, creating each node as necessary.
+   * 
+   * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+   * group, node exist, each will be created.
+   * 
+   * @param path
+   * @param data to set on the last zkNode
+   * @param createMode
+   * @param watcher
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void makePath(String path, byte[] data, CreateMode createMode,
+      Watcher watcher, boolean failOnExists) throws KeeperException, InterruptedException {
+    if (log.isInfoEnabled()) {
+      log.info("makePath: " + path);
+    }
+    
+    if (path.startsWith("/")) {
+      path = path.substring(1, path.length());
+    }
+    String[] paths = path.split("/");
+    StringBuilder sbPath = new StringBuilder();
+    for (int i = 0; i < paths.length; i++) {
+      byte[] bytes = null;
+      String pathPiece = paths[i];
+      sbPath.append("/" + pathPiece);
+      String currentPath = sbPath.toString();
+      Object exists = exists(currentPath, watcher);
+      if (exists == null || ((i == paths.length -1) && failOnExists)) {
+        CreateMode mode = CreateMode.PERSISTENT;
+        if (i == paths.length - 1) {
+          mode = createMode;
+          bytes = data;
+        }
+        keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
+        if(i == paths.length -1) {
+          // set new watch
+          exists(currentPath, watcher);
+        }
+      } else if (i == paths.length - 1) {
+        // TODO: version ? for now, don't worry about race
+        setData(currentPath, data, -1);
+        // set new watch
+        exists(currentPath, watcher);
+      }
+    }
+  }
+
+  /**
+   * @param zkPath
+   * @param createMode
+   * @param watcher
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void makePath(String zkPath, CreateMode createMode, Watcher watcher)
+      throws KeeperException, InterruptedException {
+    makePath(zkPath, null, createMode, watcher);
+  }
+
+  /**
+   * Write data to ZooKeeper.
+   * 
+   * @param path
+   * @param data
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void setData(String path, byte[] data) throws KeeperException,
+      InterruptedException {
+
+    makePath(path);
+
+    Object exists = exists(path, null);
+    if (exists != null) {
+      setData(path, data, -1);
+    } else {
+      create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+  }
+
+  /**
+   * Write file to ZooKeeper - default system encoding used.
+   * 
+   * @param path path to upload file to e.g. /solr/conf/solrconfig.xml
+   * @param file path to file to be uploaded
+   * @throws IOException
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void setData(String path, File file) throws IOException,
+      KeeperException, InterruptedException {
+    if (log.isInfoEnabled()) {
+      log.info("Write to ZooKeepeer " + file.getAbsolutePath() + " to " + path);
+    }
+
+    String data = FileUtils.readFileToString(file);
+    setData(path, data.getBytes("UTF-8"));
+  }
+
+  /**
+   * Fills string with printout of current ZooKeeper layout.
+   * 
+   * @param path
+   * @param indent
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void printLayout(String path, int indent, StringBuilder string)
+      throws KeeperException, InterruptedException {
+    byte[] data = getData(path, null, null);
+    List<String> children = getChildren(path, null);
+    StringBuilder dent = new StringBuilder();
+    for (int i = 0; i < indent; i++) {
+      dent.append(" ");
+    }
+    string.append(dent + path + " (" + children.size() + ")" + NEWL);
+    if (data != null) {
+      try {
+        String dataString = new String(data, "UTF-8");
+        if (!path.endsWith(".txt") && !path.endsWith(".xml")) {
+          string.append(dent + "DATA:\n" + dent + "    "
+              + dataString.replaceAll("\n", "\n" + dent + "    ") + NEWL);
+        } else {
+          string.append(dent + "DATA: ...supressed..." + NEWL);
+        }
+      } catch (UnsupportedEncodingException e) {
+        // can't happen - UTF-8
+        throw new RuntimeException(e);
+      }
+    }
+
+    for (String child : children) {
+      if (!child.equals("quota")) {
+        printLayout(path + (path.equals("/") ? "" : "/") + child, indent + 1,
+            string);
+      }
+    }
+
+  }
+
+  /**
+   * Prints current ZooKeeper layout to stdout.
+   * 
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void printLayoutToStdOut() throws KeeperException,
+      InterruptedException {
+    StringBuilder sb = new StringBuilder();
+    printLayout("/", 0, sb);
+    System.out.println(sb.toString());
+  }
+
+  /**
+   * @throws InterruptedException
+   */
+  public void close() throws InterruptedException {
+    keeper.close();
+  }
+
+  /**
+   * Allows package private classes to update volatile ZooKeeper.
+   * 
+   * @param keeper
+   * @throws InterruptedException 
+   */
+  void updateKeeper(SolrZooKeeper keeper) throws InterruptedException {
+   SolrZooKeeper oldKeeper = this.keeper;
+   this.keeper = keeper;
+   if (oldKeeper != null) {
+     oldKeeper.close();
+   }
+  }
+  
+  public SolrZooKeeper getSolrZooKeeper() {
+    return keeper;
+  }
+
+}

Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,37 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.zookeeper.ClientCnxn;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class SolrZooKeeper extends ZooKeeper {
+
+  public SolrZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
+      throws IOException {
+    super(connectString, sessionTimeout, watcher);
+  }
+  
+  public ClientCnxn getConnection() {
+    return cnxn;
+  }
+
+}

Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,36 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ *
+ */
+public abstract class ZkClientConnectionStrategy {
+  public abstract void connect(String zkServerAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
+  public abstract void reconnect(String serverAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
+  
+  public static abstract class ZkUpdate {
+    public abstract void update(SolrZooKeeper zooKeeper) throws InterruptedException, TimeoutException, IOException;
+  }
+  
+}

Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkNodeProps.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkNodeProps.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkNodeProps.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkNodeProps.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,58 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Map.Entry;
+
+public class ZkNodeProps extends HashMap<String,String> {
+
+  private static final long serialVersionUID = 1L;
+
+  public void load(byte[] bytes) throws IOException {
+    String stringRep = new String(bytes, "UTF-8");
+    String[] lines = stringRep.split("\n");
+    for (String line : lines) {
+      int sepIndex = line.indexOf('=');
+      String key = line.substring(0, sepIndex);
+      String value = line.substring(sepIndex + 1, line.length());
+      put(key, value);
+    }
+  }
+
+  public byte[] store() throws IOException {
+    StringBuilder sb = new StringBuilder();
+    Set<Entry<String,String>> entries = entrySet();
+    for(Entry<String,String> entry : entries) {
+      sb.append(entry.getKey() + "=" + entry.getValue() + "\n");
+    }
+    return sb.toString().getBytes("UTF-8");
+  }
+  
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    Set<Entry<String,String>> entries = entrySet();
+    for(Entry<String,String> entry : entries) {
+      sb.append(entry.getKey() + "=" + entry.getValue() + "\n");
+    }
+    return sb.toString();
+  }
+
+}

Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkStateReader.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkStateReader.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkStateReader.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,400 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZkStateReader {
+  private static Logger log = LoggerFactory.getLogger(ZkStateReader.class);
+  
+  public static final String COLLECTIONS_ZKNODE = "/collections";
+  public static final String URL_PROP = "url";
+  public static final String NODE_NAME = "node_name";
+  public static final String SHARDS_ZKNODE = "/shards";
+  public static final String LIVE_NODES_ZKNODE = "/live_nodes";
+  
+  private volatile CloudState cloudState  = new CloudState(new HashSet<String>(0), new HashMap<String,Map<String,Slice>>(0));
+  
+  private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
+
+  private static class ZKTF implements ThreadFactory {
+    private static ThreadGroup tg = new ThreadGroup("ZkStateReader");
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread td = new Thread(tg, r);
+      td.setDaemon(true);
+      return td;
+    }
+  }
+  private ScheduledExecutorService updateCloudExecutor = Executors.newScheduledThreadPool(1, new ZKTF());
+
+  private boolean cloudStateUpdateScheduled;
+
+  private SolrZkClient zkClient;
+  
+  private boolean closeClient = false;
+  
+  public ZkStateReader(SolrZkClient zkClient) {
+    this.zkClient = zkClient;
+  }
+  
+  public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) throws InterruptedException, TimeoutException, IOException {
+    closeClient = true;
+    zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
+        // on reconnect, reload cloud info
+        new OnReconnect() {
+
+          public void command() {
+            try {
+              makeCollectionsNodeWatches();
+              makeShardsWatches(true);
+              updateCloudState(false);
+            } catch (KeeperException e) {
+              log.error("", e);
+              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                  "", e);
+            } catch (InterruptedException e) {
+              // Restore the interrupted status
+              Thread.currentThread().interrupt();
+              log.error("", e);
+              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                  "", e);
+            } catch (IOException e) {
+              log.error("", e);
+              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                  "", e);
+            }
+
+          }
+        });
+  }
+  
+  // load and publish a new CollectionInfo
+  public void updateCloudState(boolean immediate) throws KeeperException, InterruptedException,
+      IOException {
+    updateCloudState(immediate, false);
+  }
+  
+  // load and publish a new CollectionInfo
+  public void updateLiveNodes() throws KeeperException, InterruptedException,
+      IOException {
+    updateCloudState(true, true);
+  }
+  
+  // load and publish a new CollectionInfo
+  private synchronized void updateCloudState(boolean immediate, final boolean onlyLiveNodes) throws KeeperException, InterruptedException,
+      IOException {
+
+    // TODO: - possibly: incremental update rather than reread everything
+    
+    // build immutable CloudInfo
+    
+    if(immediate) {
+      if(!onlyLiveNodes) {
+        log.info("Updating cloud state from ZooKeeper... ");
+      } else {
+        log.info("Updating live nodes from ZooKeeper... ");
+      }
+      CloudState cloudState;
+      cloudState = CloudState.buildCloudState(zkClient, this.cloudState, onlyLiveNodes);
+      // update volatile
+      this.cloudState = cloudState;
+    } else {
+      if(cloudStateUpdateScheduled) {
+        log.info("Cloud state update for ZooKeeper already scheduled");
+        return;
+      }
+      log.info("Scheduling cloud state update from ZooKeeper...");
+      cloudStateUpdateScheduled = true;
+      updateCloudExecutor.schedule(new Runnable() {
+        
+        public void run() {
+          log.info("Updating cloud state from ZooKeeper...");
+          synchronized (getUpdateLock()) {
+            cloudStateUpdateScheduled = false;
+            CloudState cloudState;
+            try {
+              cloudState = CloudState.buildCloudState(zkClient,
+                  ZkStateReader.this.cloudState, onlyLiveNodes);
+            } catch (KeeperException e) {
+              if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+                log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+                return;
+              }
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (InterruptedException e) {
+              // Restore the interrupted status
+              Thread.currentThread().interrupt();
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (IOException e) {
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            }
+            // update volatile
+            ZkStateReader.this.cloudState = cloudState;
+          }
+        }
+      }, CLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
+    }
+
+  }
+  
+  public void makeShardZkNodeWatches(boolean makeWatchesForReconnect) throws KeeperException, InterruptedException {
+    CloudState cloudState = getCloudState();
+    
+    Set<String> knownCollections = cloudState.getCollections();
+    List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
+
+    for(final String collection : collections) {
+      if(makeWatchesForReconnect || !knownCollections.contains(collection)) {
+        log.info("Found new collection:" + collection);
+        Watcher watcher = new Watcher() {
+          public void process(WatchedEvent event) {
+            log.info("Detected changed ShardId in collection:" + collection);
+            try {
+              makeShardsWatches(collection, false);
+              updateCloudState(false);
+            } catch (KeeperException e) {
+              if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+                log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+                return;
+              }
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (InterruptedException e) {
+              // Restore the interrupted status
+              Thread.currentThread().interrupt();
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (IOException e) {
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            }
+          }
+        };
+        boolean madeWatch = true;
+        String shardZkNode = COLLECTIONS_ZKNODE + "/" + collection
+            + SHARDS_ZKNODE;
+        for (int i = 0; i < 5; i++) {
+          try {
+            zkClient.getChildren(shardZkNode, watcher);
+          } catch (KeeperException.NoNodeException e) {
+            // most likely, the collections node has been created, but not the
+            // shards node yet -- pause and try again
+            madeWatch = false;
+            if (i == 4) {
+              log.error("Could not set shards zknode watch, because the zknode does not exist:" + shardZkNode);
+              break;
+            }
+            Thread.sleep(100);
+          }
+          if (madeWatch) {
+            log.info("Made shard watch:" + shardZkNode);
+            break;
+          }
+        }
+      }
+    }
+  }
+  
+  public void makeShardsWatches(final String collection, boolean makeWatchesForReconnect) throws KeeperException,
+      InterruptedException {
+    if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE)) {
+      List<String> shardIds = zkClient.getChildren(COLLECTIONS_ZKNODE + "/"
+          + collection + SHARDS_ZKNODE, null);
+      CloudState cloudState = getCloudState();
+      Set<String> knownShardIds;
+      Map<String,Slice> slices = cloudState.getSlices(collection);
+      if (slices != null) {
+        knownShardIds = slices.keySet();
+      } else {
+        knownShardIds = new HashSet<String>(0);
+      }
+      for (final String shardId : shardIds) {
+        if (makeWatchesForReconnect || !knownShardIds.contains(shardId)) {
+          zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection
+              + SHARDS_ZKNODE + "/" + shardId, new Watcher() {
+
+            public void process(WatchedEvent event) {
+              log.info("Detected a shard change under ShardId:" + shardId + " in collection:" + collection);
+              try {
+                updateCloudState(false);
+              } catch (KeeperException e) {
+                if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+                  log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+                  return;
+                }
+                log.error("", e);
+                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                    "", e);
+              } catch (InterruptedException e) {
+                // Restore the interrupted status
+                Thread.currentThread().interrupt();
+                log.error("", e);
+                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                    "", e);
+              } catch (IOException e) {
+                log.error("", e);
+                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                    "", e);
+              }
+            }
+          });
+        }
+      }
+    }
+  }
+  
+  /**
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void makeShardsWatches(boolean makeWatchesForReconnect) throws KeeperException, InterruptedException {
+    List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
+    for (final String collection : collections) {
+      makeShardsWatches(collection, makeWatchesForReconnect);
+    }
+  }
+  
+  /**
+   * @return information about the cluster from ZooKeeper
+   */
+  public CloudState getCloudState() {
+    return cloudState;
+  }
+  
+  public Object getUpdateLock() {
+    return this;
+  }
+
+  public void close() {
+    if (closeClient) {
+      try {
+        zkClient.close();
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      }
+    }
+  }
+
+  public void makeCollectionsNodeWatches() throws KeeperException, InterruptedException {
+    log.info("Start watching collections zk node for changes");
+    zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
+
+      public void process(WatchedEvent event) {
+          try {
+
+            log.info("Detected a new or removed collection");
+            synchronized (getUpdateLock()) {
+              makeShardZkNodeWatches(false);
+              updateCloudState(false);
+            }
+            // re-watch
+            String path = event.getPath();
+            if (path != null) {
+              zkClient.getChildren(path, this);
+            }
+          } catch (KeeperException e) {
+            if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+              log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+              return;
+            }
+            log.error("", e);
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (InterruptedException e) {
+            // Restore the interrupted status
+            Thread.currentThread().interrupt();
+            log.error("", e);
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (IOException e) {
+            log.error("", e);
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          }
+
+      }});
+    
+    zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
+
+      public void process(WatchedEvent event) {
+        if(event.getType() !=  EventType.NodeDataChanged) {
+          return;
+        }
+        log.info("Notified of CloudState change");
+        try {
+          synchronized (getUpdateLock()) {
+            makeShardZkNodeWatches(false);
+            updateCloudState(false);
+          }
+          zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, this);
+        } catch (KeeperException e) {
+          if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+            log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+            return;
+          }
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        } catch (InterruptedException e) {
+          // Restore the interrupted status
+          Thread.currentThread().interrupt();
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        } catch (IOException e) {
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        }
+        
+      }});
+    
+  }
+}

Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZooKeeperException.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZooKeeperException.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZooKeeperException.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZooKeeperException.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,33 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.solr.common.SolrException;
+
+public class ZooKeeperException extends SolrException {
+
+  public ZooKeeperException(ErrorCode code, String msg, Throwable th) {
+    super(code, msg, th);
+  }
+  
+  public ZooKeeperException(ErrorCode code, String msg) {
+    super(code, msg);
+  }
+
+}

Modified: lucene/dev/trunk/solr/src/common/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/params/CoreAdminParams.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/params/CoreAdminParams.java Wed Oct 13 17:01:13 2010
@@ -59,6 +59,12 @@ public interface CoreAdminParams 
    * The directories are specified by multiple indexDir parameters. */
   public final static String INDEX_DIR = "indexDir";
 
+  /** The collection name in solr cloud */
+  public final static String COLLECTION = "collection";
+
+  /** The shard id in solr cloud */
+  public final static String SHARD = "shard";
+
   public enum CoreAdminAction {
     STATUS,  
     LOAD,

Added: lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/CloudDescriptor.java (added)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/CloudDescriptor.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,52 @@
+package org.apache.solr.cloud;
+
+import org.apache.solr.common.params.SolrParams;
+
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class CloudDescriptor {
+  private String shardId;
+  private String collectionName;
+  private SolrParams params;
+
+  public void setShardId(String shardId) {
+    this.shardId = shardId;
+  }
+  
+  public String getShardId() {
+    return shardId;
+  }
+  
+  public String getCollectionName() {
+    return collectionName;
+  }
+
+  public void setCollectionName(String collectionName) {
+    this.collectionName = collectionName;
+  }
+
+  /** Optional parameters that can change how a core is created. */
+  public SolrParams getParams() {
+    return params;
+  }
+
+  public void setParams(SolrParams params) {
+    this.params = params;
+  }
+}

Added: lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/SolrZkServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/SolrZkServer.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/SolrZkServer.java (added)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/SolrZkServer.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,477 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.slf4j.LoggerFactory;
+
+
+public class SolrZkServer {
+  static org.slf4j.Logger log = LoggerFactory.getLogger(SolrZkServer.class);
+  
+  String zkRun;
+  String zkHost;
+  String solrHome;
+  String solrPort;
+  Properties props;
+  SolrZkServerProps zkProps;
+
+  private Thread zkThread;  // the thread running a zookeeper server, only if zkRun is set
+
+  public SolrZkServer(String zkRun, String zkHost, String solrHome, String solrPort) {
+    this.zkRun = zkRun;
+    this.zkHost = zkHost;
+    this.solrHome = solrHome;
+    this.solrPort = solrPort;
+  }
+
+  public String getClientString() {
+    if (zkHost != null) return zkHost;
+    
+    if (zkProps == null) return null;
+
+    // if the string wasn't passed as zkHost, then use the standalone server we started
+    if (zkRun == null) return null;
+    return "localhost:" + zkProps.getClientPortAddress().getPort();
+  }
+
+  public void parseConfig() {
+    if (zkProps == null) {
+      zkProps = new SolrZkServerProps();
+      // set default data dir
+      // TODO: use something based on IP+port???  support ensemble all from same solr home?
+      zkProps.setDataDir(solrHome + '/' + "zoo_data");
+      zkProps.zkRun = zkRun;
+      zkProps.solrPort = solrPort;
+    }
+    
+    try {
+      props = SolrZkServerProps.getProperties(solrHome + '/' + "zoo.cfg");
+      SolrZkServerProps.injectServers(props, zkRun, zkHost);
+      zkProps.parseProperties(props);
+      if (zkProps.getClientPortAddress() == null) {
+        zkProps.setClientPort(Integer.parseInt(solrPort)+1000);
+      }
+    } catch (QuorumPeerConfig.ConfigException e) {
+      if (zkRun != null)
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    } catch (IOException e) {
+      if (zkRun != null)
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  public Map<Long, QuorumPeer.QuorumServer> getServers() {
+    return zkProps.getServers();
+  }
+
+  public void start() {
+    if (zkRun == null) return;
+
+    zkThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          if (zkProps.getServers().size() > 1) {
+            QuorumPeerMain zkServer = new QuorumPeerMain();
+            zkServer.runFromConfig(zkProps);
+          } else {
+            ServerConfig sc = new ServerConfig();
+            sc.readFrom(zkProps);
+            ZooKeeperServerMain zkServer = new ZooKeeperServerMain();
+            zkServer.runFromConfig(sc);
+          }
+          log.info("ZooKeeper Server exited.");
+        } catch (Throwable e) {
+          log.error("ZooKeeper Server ERROR", e);
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+        }
+      }
+    };
+
+    if (zkProps.getServers().size() > 1) {
+      log.info("STARTING EMBEDDED ENSEMBLE ZOOKEEPER SERVER at port " + zkProps.getClientPortAddress().getPort());
+    } else {
+      log.info("STARTING EMBEDDED STANDALONE ZOOKEEPER SERVER at port " + zkProps.getClientPortAddress().getPort());
+    }
+
+    zkThread.setDaemon(true);
+    zkThread.start();
+    try {
+      Thread.sleep(500); // pause for ZooKeeper to start
+    } catch (Exception e) {
+      log.error("STARTING ZOOKEEPER", e);
+    }
+  }
+
+  public void stop() {
+    if (zkRun == null) return;
+    zkThread.interrupt();
+  }
+}
+
+
+
+
+// Allows us to set a default for the data dir before parsing
+// zoo.cfg (which validates that there is a dataDir)
+class SolrZkServerProps extends QuorumPeerConfig {
+  protected static org.slf4j.Logger LOG = LoggerFactory.getLogger(QuorumPeerConfig.class);
+
+  String solrPort; // port that Solr is listening on
+  String zkRun;
+
+  /**
+   * Parse a ZooKeeper configuration file
+   * @param path the patch of the configuration file
+   * @throws ConfigException error processing configuration
+   */
+  public static Properties getProperties(String path) throws ConfigException {
+    File configFile = new File(path);
+
+    LOG.info("Reading configuration from: " + configFile);
+
+    try {
+      if (!configFile.exists()) {
+        throw new IllegalArgumentException(configFile.toString()
+            + " file is missing");
+      }
+
+      Properties cfg = new Properties();
+      FileInputStream in = new FileInputStream(configFile);
+      try {
+        cfg.load(in);
+      } finally {
+        in.close();
+      }
+
+      return cfg;
+
+    } catch (IOException e) {
+      throw new ConfigException("Error processing " + path, e);
+    } catch (IllegalArgumentException e) {
+      throw new ConfigException("Error processing " + path, e);
+    }
+  }
+
+
+  // Adds server.x if they don't exist, based on zkHost if it does exist.
+  // Given zkHost=localhost:1111,localhost:2222 this will inject
+  // server.0=localhost:1112:1113
+  // server.1=localhost:2223:2224
+  public static void injectServers(Properties props, String zkRun, String zkHost) {
+
+    // if clientPort not already set, use zkRun
+    if (zkRun != null && props.getProperty("clientPort")==null) {
+      int portIdx = zkRun.lastIndexOf(':');
+      if (portIdx > 0) {
+        String portStr = zkRun.substring(portIdx+1);
+        props.setProperty("clientPort", portStr);
+      }
+    }
+
+    boolean hasServers = hasServers(props);
+
+    if (!hasServers && zkHost != null) {
+      int alg = Integer.parseInt(props.getProperty("electionAlg","3").trim());
+      String[] hosts = zkHost.split(",");
+      int serverNum = 0;
+      for (String hostAndPort : hosts) {
+        hostAndPort = hostAndPort.trim();
+        int portIdx = hostAndPort.lastIndexOf(':');
+        String clientPortStr = hostAndPort.substring(portIdx+1);
+        int clientPort = Integer.parseInt(clientPortStr);
+        String host = hostAndPort.substring(0,portIdx);
+
+        String serverStr = host + ':' + (clientPort+1);
+        // zk leader election algorithms other than 0 need an extra port for leader election.
+        if (alg != 0) {
+          serverStr = serverStr + ':' + (clientPort+2);
+        }
+
+        props.setProperty("server."+serverNum, serverStr);
+        serverNum++;
+      }
+    }
+  }
+
+  public static boolean hasServers(Properties props) {
+    for (Object key : props.keySet())
+      if (((String)key).startsWith("server."))
+        return true;
+    return false;
+  }
+
+  // called by the modified version of parseProperties
+  // when the myid file is missing.
+  public Long getMySeverId() {
+    if (zkRun == null && solrPort == null) return null;
+
+    Map<Long, QuorumPeer.QuorumServer> slist = getServers();
+
+    String myHost = "localhost";
+    InetSocketAddress thisAddr = null;
+
+    if (zkRun != null && zkRun.length()>0) {
+      String parts[] = zkRun.split(":");
+      myHost = parts[0];
+      thisAddr = new InetSocketAddress(myHost, Integer.parseInt(parts[1]) + 1);
+    } else {
+      // default to localhost:<solrPort+1001>
+      thisAddr = new InetSocketAddress(myHost, Integer.parseInt(solrPort)+1001);
+    }
+
+
+    // first try a straight match by host
+    Long me = null;
+    boolean multiple = false;
+    int port = 0;
+    for (QuorumPeer.QuorumServer server : slist.values()) {
+      if (server.addr.getHostName().equals(myHost)) {
+        multiple = me!=null;
+        me = server.id;
+        port = server.addr.getPort();
+      }
+    }
+
+    if (!multiple) {
+      // only one host matched... assume it's me.
+      setClientPort(port - 1);
+      return me;
+    }
+
+    if (me == null) {
+      // no hosts matched.
+      return null;
+    }
+
+
+    // multiple matches... try to figure out by port.
+    for (QuorumPeer.QuorumServer server : slist.values()) {
+      if (server.addr.equals(thisAddr)) {
+        if (clientPortAddress != null || clientPortAddress.getPort() <= 0)
+          setClientPort(server.addr.getPort() - 1);
+        return server.id;
+      }
+    }
+
+    return null;
+  }
+
+
+
+  public void setDataDir(String dataDir) {
+    this.dataDir = dataDir;
+  }
+
+  public void setClientPort(int clientPort) {
+    if (clientPortAddress != null) {
+      try {
+        this.clientPortAddress = new InetSocketAddress(
+                InetAddress.getByName(clientPortAddress.getHostName()), clientPort);
+      } catch (UnknownHostException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      this.clientPortAddress = new InetSocketAddress(clientPort);
+    }
+  }
+
+
+  // NOTE: copied from ZooKeeper 3.2
+  /**
+   * Parse config from a Properties.
+   * @param zkProp Properties to parse from.
+   * @throws java.io.IOException
+   * @throws ConfigException
+   */
+  public void parseProperties(Properties zkProp)
+      throws IOException, ConfigException {
+    for (Entry<Object, Object> entry : zkProp.entrySet()) {
+      String key = entry.getKey().toString().trim();
+      String value = entry.getValue().toString().trim();
+      if (key.equals("dataDir")) {
+        dataDir = value;
+      } else if (key.equals("dataLogDir")) {
+        dataLogDir = value;
+      } else if (key.equals("clientPort")) {
+        setClientPort(Integer.parseInt(value));
+      } else if (key.equals("tickTime")) {
+        tickTime = Integer.parseInt(value);
+      } else if (key.equals("initLimit")) {
+        initLimit = Integer.parseInt(value);
+      } else if (key.equals("syncLimit")) {
+        syncLimit = Integer.parseInt(value);
+      } else if (key.equals("electionAlg")) {
+        electionAlg = Integer.parseInt(value);
+      } else if (key.equals("maxClientCnxns")) {
+        maxClientCnxns = Integer.parseInt(value);
+      } else if (key.startsWith("server.")) {
+        int dot = key.indexOf('.');
+        long sid = Long.parseLong(key.substring(dot + 1));
+        String parts[] = value.split(":");
+        if ((parts.length != 2) && (parts.length != 3)) {
+          LOG.error(value
+              + " does not have the form host:port or host:port:port");
+        }
+        InetSocketAddress addr = new InetSocketAddress(parts[0],
+            Integer.parseInt(parts[1]));
+        if (parts.length == 2) {
+          servers.put(Long.valueOf(sid), new QuorumPeer.QuorumServer(sid, addr));
+        } else if (parts.length == 3) {
+          InetSocketAddress electionAddr = new InetSocketAddress(
+              parts[0], Integer.parseInt(parts[2]));
+          servers.put(Long.valueOf(sid), new QuorumPeer.QuorumServer(sid, addr,
+              electionAddr));
+        }
+      } else if (key.startsWith("group")) {
+        int dot = key.indexOf('.');
+        long gid = Long.parseLong(key.substring(dot + 1));
+
+        numGroups++;
+
+        String parts[] = value.split(":");
+        for(String s : parts){
+          long sid = Long.parseLong(s);
+          if(serverGroup.containsKey(sid))
+            throw new ConfigException("Server " + sid + "is in multiple groups");
+          else
+            serverGroup.put(sid, gid);
+        }
+
+      } else if(key.startsWith("weight")) {
+        int dot = key.indexOf('.');
+        long sid = Long.parseLong(key.substring(dot + 1));
+        serverWeight.put(sid, Long.parseLong(value));
+      } else {
+        System.setProperty("zookeeper." + key, value);
+      }
+    }
+    if (dataDir == null) {
+      throw new IllegalArgumentException("dataDir is not set");
+    }
+    if (dataLogDir == null) {
+      dataLogDir = dataDir;
+    } else {
+      if (!new File(dataLogDir).isDirectory()) {
+        throw new IllegalArgumentException("dataLogDir " + dataLogDir
+            + " is missing.");
+      }
+    }
+
+    if (tickTime == 0) {
+      throw new IllegalArgumentException("tickTime is not set");
+    }
+    if (servers.size() > 1) {
+      if (initLimit == 0) {
+        throw new IllegalArgumentException("initLimit is not set");
+      }
+      if (syncLimit == 0) {
+        throw new IllegalArgumentException("syncLimit is not set");
+      }
+      /*
+      * If using FLE, then every server requires a separate election
+      * port.
+      */
+      if (electionAlg != 0) {
+        for (QuorumPeer.QuorumServer s : servers.values()) {
+          if (s.electionAddr == null)
+            throw new IllegalArgumentException(
+                "Missing election port for server: " + s.id);
+        }
+      }
+
+      /*
+      * Default of quorum config is majority
+      */
+      if(serverGroup.size() > 0){
+        if(servers.size() != serverGroup.size())
+          throw new ConfigException("Every server must be in exactly one group");
+        /*
+         * The deafult weight of a server is 1
+         */
+        for(QuorumPeer.QuorumServer s : servers.values()){
+          if(!serverWeight.containsKey(s.id))
+            serverWeight.put(s.id, (long) 1);
+        }
+
+        /*
+                      * Set the quorumVerifier to be QuorumHierarchical
+                      */
+        quorumVerifier = new QuorumHierarchical(numGroups,
+            serverWeight, serverGroup);
+      } else {
+        /*
+                      * The default QuorumVerifier is QuorumMaj
+                      */
+
+        LOG.info("Defaulting to majority quorums");
+        quorumVerifier = new QuorumMaj(servers.size());
+      }
+
+      File myIdFile = new File(dataDir, "myid");
+      if (!myIdFile.exists()) {
+        ///////////////// ADDED FOR SOLR //////
+        Long myid = getMySeverId();
+        if (myid != null) {
+          serverId = myid;
+          return;
+        }
+        if (zkRun == null) return;
+        //////////////// END ADDED FOR SOLR //////
+        throw new IllegalArgumentException(myIdFile.toString()
+            + " file is missing");
+      }
+
+      BufferedReader br = new BufferedReader(new FileReader(myIdFile));
+      String myIdString;
+      try {
+        myIdString = br.readLine();
+      } finally {
+        br.close();
+      }
+      try {
+        serverId = Long.parseLong(myIdString);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("serverid " + myIdString
+            + " is not a number");
+      }
+    }
+  }
+
+
+}



Mime
View raw message