lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r1022188 [2/4] - in /lucene/dev/trunk/solr: ./ lib/ src/common/org/apache/solr/common/cloud/ src/common/org/apache/solr/common/params/ src/java/org/apache/solr/cloud/ src/java/org/apache/solr/core/ src/java/org/apache/solr/handler/admin/ sr...
Date Wed, 13 Oct 2010 17:01:33 GMT
Added: lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkController.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkController.java (added)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkController.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,659 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.OnReconnect;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handle ZooKeeper interactions.
+ * 
+ * notes: loads everything on init, creates what's not there - further updates
+ * are prompted with Watches.
+ * 
+ * TODO: exceptions during shutdown on attempts to update cloud state
+ * 
+ */
+public final class ZkController {
+
+  private static Logger log = LoggerFactory.getLogger(ZkController.class);
+
+  static final String NEWL = System.getProperty("line.separator");
+
+
+  private final static Pattern URL_POST = Pattern.compile("https?://(.*)");
+  private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
+
+
+  // package private for tests
+
+  static final String CONFIGS_ZKNODE = "/configs";
+
+  public final static String COLLECTION_PARAM_PREFIX="collection.";
+  public final static String CONFIGNAME_PROP="configName";
+
+  private SolrZkClient zkClient;
+  
+  private ZkStateReader zkStateReader;
+
+  private String zkServerAddress;
+
+  private String localHostPort;
+  private String localHostContext;
+  private String localHostName;
+  private String localHost;
+
+  private String hostName;
+
+  /**
+   * @param zkServerAddress ZooKeeper server host address
+   * @param zkClientTimeout
+   * @param zkClientConnectTimeout
+   * @param localHost
+   * @param locaHostPort
+   * @param localHostContext
+   * @throws InterruptedException
+   * @throws TimeoutException
+   * @throws IOException
+   */
+  public ZkController(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
+      String localHostContext) throws InterruptedException,
+      TimeoutException, IOException {
+    this.zkServerAddress = zkServerAddress;
+    this.localHostPort = locaHostPort;
+    this.localHostContext = localHostContext;
+    this.localHost = localHost;
+
+    zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
+        // on reconnect, reload cloud info
+        new OnReconnect() {
+
+          public void command() {
+            try {
+              zkStateReader.makeCollectionsNodeWatches();
+              zkStateReader.makeShardsWatches(true);
+              createEphemeralLiveNode();
+              zkStateReader.updateCloudState(false);
+            } catch (KeeperException e) {
+              log.error("", e);
+              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                  "", e);
+            } catch (InterruptedException e) {
+              // Restore the interrupted status
+              Thread.currentThread().interrupt();
+              log.error("", e);
+              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                  "", e);
+            } catch (IOException e) {
+              log.error("", e);
+              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                  "", e);
+            }
+
+          }
+        });
+    zkStateReader = new ZkStateReader(zkClient);
+    init();
+  }
+
+  /**
+   * @param shardId
+   * @param collection
+   * @throws IOException
+   * @throws InterruptedException 
+   * @throws KeeperException 
+   */
+  private void addZkShardsNode(String shardId, String collection) throws IOException, InterruptedException, KeeperException {
+
+    String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
+    
+    try {
+      
+      // shards node
+      if (!zkClient.exists(shardsZkPath)) {
+        if (log.isInfoEnabled()) {
+          log.info("creating zk shards node:" + shardsZkPath);
+        }
+        // makes shards zkNode if it doesn't exist
+        zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
+        
+        // TODO: consider how these notifications are being done
+        // ping that there is a new shardId
+        zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
+
+      }
+    } catch (KeeperException e) {
+      // its okay if another beats us creating the node
+      if (e.code() != KeeperException.Code.NODEEXISTS) {
+        throw e;
+      }
+    }
+
+  }
+
+  /**
+   * Closes the underlying ZooKeeper client.
+   */
+  public void close() {
+    try {
+      zkClient.close();
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+      log.error("", e);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+          "", e);
+    }
+  }
+
+  /**
+   * @param collection
+   * @param fileName
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public boolean configFileExists(String collection, String fileName)
+      throws KeeperException, InterruptedException {
+    Stat stat = zkClient.exists(CONFIGS_ZKNODE + "/" + collection + "/" + fileName, null);
+    return stat != null;
+  }
+
+  /**
+   * @return information about the cluster from ZooKeeper
+   */
+  public CloudState getCloudState() {
+    return zkStateReader.getCloudState();
+  }
+
+  /**
+   * @param zkConfigName
+   * @param fileName
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public byte[] getConfigFileData(String zkConfigName, String fileName)
+      throws KeeperException, InterruptedException {
+    String zkPath = CONFIGS_ZKNODE + "/" + zkConfigName + "/" + fileName;
+    byte[] bytes = zkClient.getData(zkPath, null, null);
+    if (bytes == null) {
+      log.error("Config file contains no data:" + zkPath);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+          "Config file contains no data:" + zkPath);
+    }
+    
+    return bytes;
+  }
+
+  // TODO: consider how this is done
+  private String getHostAddress() throws IOException {
+
+    if (localHost == null) {
+      localHost = "http://" + InetAddress.getLocalHost().getHostName();
+    } else {
+      Matcher m = URL_PREFIX.matcher(localHost);
+      if (m.matches()) {
+        String prefix = m.group(1);
+        localHost = prefix + localHost;
+      } else {
+        localHost = "http://" + localHost;
+      }
+    }
+
+    return localHost;
+  }
+  
+  public String getHostName() {
+    return hostName;
+  }
+
+  public SolrZkClient getZkClient() {
+    return zkClient;
+  }
+
+  /**
+   * @return
+   */
+  public String getZkServerAddress() {
+    return zkServerAddress;
+  }
+
+  private void init() {
+
+    try {
+      localHostName = getHostAddress();
+      Matcher m = URL_POST.matcher(localHostName);
+
+      if (m.matches()) {
+        hostName = m.group(1);
+      } else {
+        log.error("Unrecognized host:" + localHostName);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "Unrecognized host:" + localHostName);
+      }
+      
+      // makes nodes zkNode
+      try {
+        zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
+      } catch (KeeperException e) {
+        // its okay if another beats us creating the node
+        if (e.code() != KeeperException.Code.NODEEXISTS) {
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        }
+      }
+      
+      createEphemeralLiveNode();
+      setUpCollectionsNode();
+      zkStateReader.makeCollectionsNodeWatches();
+      
+    } catch (IOException e) {
+      log.error("", e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Can't create ZooKeeperController", e);
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+      log.error("", e);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+          "", e);
+    } catch (KeeperException e) {
+      log.error("", e);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+          "", e);
+    }
+
+  }
+
+  private void createEphemeralLiveNode() throws KeeperException,
+      InterruptedException {
+    String nodeName = getNodeName();
+    String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
+    log.info("Register node as live in ZooKeeper:" + nodePath);
+    Watcher liveNodeWatcher = new Watcher() {
+
+      public void process(WatchedEvent event) {
+        try {
+          log.info("Updating live nodes:" + zkClient);
+          try {
+            zkStateReader.updateLiveNodes();
+          } finally {
+            // re-make watch
+
+            String path = event.getPath();
+            if(path == null) {
+              // on shutdown, it appears this can trigger with a null path
+              log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+              return;
+            }
+            zkClient.getChildren(event.getPath(), this);
+          }
+        } catch (KeeperException e) {
+          if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+            log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+            return;
+          }
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        } catch (InterruptedException e) {
+          // Restore the interrupted status
+          Thread.currentThread().interrupt();
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        } catch (IOException e) {
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        }
+        
+      }
+      
+    };
+    try {
+      boolean nodeDeleted = true;
+      try {
+        // we attempt a delete in the case of a quick server bounce -
+        // if there was not a graceful shutdown, the node may exist
+        // until expiration timeout - so a node won't be created here because
+        // it exists, but eventually the node will be removed. So delete
+        // in case it exists and create a new node.
+        zkClient.delete(nodePath, -1);
+      } catch (KeeperException.NoNodeException e) {
+        // fine if there is nothing to delete
+        // TODO: annoying that ZK logs a warning on us
+        nodeDeleted = false;
+      }
+      if (nodeDeleted) {
+        log
+            .info("Found a previous node that still exists while trying to register a new live node "
+                + nodePath + " - removing existing node to create another.");
+      }
+      zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
+    } catch (KeeperException e) {
+      // its okay if the node already exists
+      if (e.code() != KeeperException.Code.NODEEXISTS) {
+        throw e;
+      }
+    }
+    zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, liveNodeWatcher);
+  }
+  
+  public String getNodeName() {
+    return hostName + ":" + localHostPort + "_"+ localHostContext;
+  }
+
+  /**
+   * @param path
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public boolean pathExists(String path) throws KeeperException,
+      InterruptedException {
+    return zkClient.exists(path);
+  }
+
+  /**
+   * @param collection
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   * @throws IOException 
+   */
+  public String readConfigName(String collection) throws KeeperException,
+      InterruptedException, IOException {
+
+    String configName = null;
+
+    String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
+    if (log.isInfoEnabled()) {
+      log.info("Load collection config from:" + path);
+    }
+    byte[] data = zkClient.getData(path, null, null);
+    ZkNodeProps props = new ZkNodeProps();
+    
+    if(data != null) {
+      props.load(data);
+      configName = props.get(CONFIGNAME_PROP);
+    }
+    
+    if (configName != null && !zkClient.exists(CONFIGS_ZKNODE + "/" + configName)) {
+      log.error("Specified config does not exist in ZooKeeper:" + configName);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+          "Specified config does not exist in ZooKeeper:" + configName);
+    }
+
+    return configName;
+  }
+
+  /**
+   * Register shard with ZooKeeper.
+   * 
+   * @param coreName
+   * @param cloudDesc
+   * @param forcePropsUpdate update solr.xml core props even if the shard is already registered
+   * @throws IOException
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void register(String coreName, CloudDescriptor cloudDesc, boolean forcePropsUpdate) throws IOException,
+      KeeperException, InterruptedException {
+    String shardUrl = localHostName + ":" + localHostPort + "/" + localHostContext
+        + "/" + coreName;
+    
+    String collection = cloudDesc.getCollectionName();
+    
+    String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + cloudDesc.getShardId();
+
+    boolean shardZkNodeAlreadyExists = zkClient.exists(shardsZkPath);
+    
+    if(shardZkNodeAlreadyExists && !forcePropsUpdate) {
+      return;
+    }
+    
+    if (log.isInfoEnabled()) {
+      log.info("Register shard - core:" + coreName + " address:"
+          + shardUrl);
+    }
+
+    ZkNodeProps props = new ZkNodeProps();
+    props.put(ZkStateReader.URL_PROP, shardUrl);
+    
+    props.put(ZkStateReader.NODE_NAME, getNodeName());
+
+    byte[] bytes = props.store();
+    
+    String shardZkNodeName = getNodeName() + "_" + coreName;
+
+    if(shardZkNodeAlreadyExists && forcePropsUpdate) {
+      zkClient.setData(shardsZkPath + "/" + shardZkNodeName, bytes);
+      // tell everyone to update cloud info
+      zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
+    } else {
+      addZkShardsNode(cloudDesc.getShardId(), collection);
+      try {
+        zkClient.create(shardsZkPath + "/" + shardZkNodeName, bytes,
+            CreateMode.PERSISTENT);
+        // tell everyone to update cloud info
+        zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
+      } catch (KeeperException e) {
+        // its okay if the node already exists
+        if (e.code() != KeeperException.Code.NODEEXISTS) {
+          throw e;
+        }
+        // for some reason the shard already exists, though it didn't when we
+        // started registration - just return
+        return;
+      }
+    }
+
+  }
+
+  /**
+   * @param coreName
+   * @param cloudDesc
+   */
+  public void unregister(String coreName, CloudDescriptor cloudDesc) {
+    // TODO : perhaps mark the core down in zk?
+  }
+
+  /**
+   * @param dir
+   * @param zkPath
+   * @throws IOException
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void uploadToZK(File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
+    File[] files = dir.listFiles();
+    for(File file : files) {
+      if (!file.getName().startsWith(".")) {
+        if (!file.isDirectory()) {
+          zkClient.setData(zkPath + "/" + file.getName(), file);
+        } else {
+          uploadToZK(file, zkPath + "/" + file.getName());
+        }
+      }
+    }
+  }
+  
+  /**
+   * @param dir
+   * @param configName
+   * @throws IOException
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void uploadConfigDir(File dir, String configName) throws IOException, KeeperException, InterruptedException {
+    uploadToZK(dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
+  }
+
+  // convenience for testing
+  void printLayoutToStdOut() throws KeeperException, InterruptedException {
+    zkClient.printLayoutToStdOut();
+  }
+
+  private void setUpCollectionsNode() throws KeeperException, InterruptedException {
+    try {
+      if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE)) {
+        if (log.isInfoEnabled()) {
+          log.info("creating zk collections node:" + ZkStateReader.COLLECTIONS_ZKNODE);
+        }
+        // makes collections zkNode if it doesn't exist
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
+      }
+    } catch (KeeperException e) {
+      // its okay if another beats us creating the node
+      if (e.code() != KeeperException.Code.NODEEXISTS) {
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      }
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+      log.error("", e);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+          "", e);
+    }
+    
+  }
+
+  public void createCollectionZkNode(CloudDescriptor cd) throws KeeperException, InterruptedException, IOException {
+    String collection = cd.getCollectionName();
+    
+    log.info("Check for collection zkNode:" + collection);
+    String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
+    
+    try {
+      if(!zkClient.exists(collectionPath)) {
+        log.info("Creating collection in ZooKeeper:" + collection);
+       SolrParams params = cd.getParams();
+
+        try {
+          ZkNodeProps collectionProps = new ZkNodeProps();
+          // TODO: if collection.configName isn't set, and there isn't already a conf in zk, just use that?
+          String defaultConfigName = System.getProperty(COLLECTION_PARAM_PREFIX+CONFIGNAME_PROP, "configuration1");
+
+          // params passed in - currently only done via core admin (create core commmand).
+          if (params != null) {
+            Iterator<String> iter = params.getParameterNamesIterator();
+            while (iter.hasNext()) {
+              String paramName = iter.next();
+              if (paramName.startsWith(COLLECTION_PARAM_PREFIX)) {
+                collectionProps.put(paramName.substring(COLLECTION_PARAM_PREFIX.length()), params.get(paramName));
+              }
+            }
+
+            // if the config name wasn't passed in, use the default
+            if (!collectionProps.containsKey(CONFIGNAME_PROP))
+              collectionProps.put(CONFIGNAME_PROP,  defaultConfigName);
+            
+          } else if(System.getProperty("bootstrap_confdir") != null) {
+            // if we are bootstrapping a collection, default the config for
+            // a new collection to the collection we are bootstrapping
+            log.info("Setting config for collection:" + collection + " to " + defaultConfigName);
+
+            Properties sysProps = System.getProperties();
+            for (String sprop : System.getProperties().stringPropertyNames()) {
+              if (sprop.startsWith(COLLECTION_PARAM_PREFIX)) {
+                collectionProps.put(sprop.substring(COLLECTION_PARAM_PREFIX.length()), sysProps.getProperty(sprop));                
+              }
+            }
+            
+            // if the config name wasn't passed in, use the default
+            if (!collectionProps.containsKey(CONFIGNAME_PROP))
+              collectionProps.put(CONFIGNAME_PROP,  defaultConfigName);
+
+          } else {
+            // check for configName
+            log.info("Looking for collection configName");
+            int retry = 1;
+            for (; retry < 6; retry++) {
+              if (zkClient.exists(collectionPath)) {
+                collectionProps = new ZkNodeProps();
+                collectionProps.load(zkClient.getData(collectionPath, null, null));
+                if (collectionProps.containsKey(CONFIGNAME_PROP)) {
+                  break;
+                }
+              }
+              log.info("Could not find collection configName - pausing for 2 seconds and trying again - try: " + retry);
+              Thread.sleep(2000);
+            }
+            if (retry == 6) {
+              log.error("Could not find configName for collection " + collection);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR,
+                  "Could not find configName for collection " + collection);
+            }
+          }
+          
+          zkClient.makePath(collectionPath, collectionProps.store(), CreateMode.PERSISTENT, null, true);
+         
+          // ping that there is a new collection
+          zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
+        } catch (KeeperException e) {
+          // its okay if the node already exists
+          if (e.code() != KeeperException.Code.NODEEXISTS) {
+            throw e;
+          }
+        }
+      } else {
+        log.info("Collection zkNode exists");
+      }
+      
+    } catch (KeeperException e) {
+      // its okay if another beats us creating the node
+      if (e.code() != KeeperException.Code.NODEEXISTS) {
+        throw e;
+      }
+    }
+    
+  }
+  
+  public ZkStateReader getZkStateReader() {
+    return zkStateReader;
+  }
+
+}

Added: lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java (added)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,120 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * ResourceLoader that works with ZooKeeper.
+ *
+ */
+public class ZkSolrResourceLoader extends SolrResourceLoader {
+
+  private final String collectionZkPath;
+  private ZkController zkController;
+
+  public ZkSolrResourceLoader(String instanceDir, String collection,
+      ZkController zooKeeperController) {
+    super(instanceDir);
+    this.zkController = zooKeeperController;
+    collectionZkPath = ZkController.CONFIGS_ZKNODE + "/" + collection;
+  }
+
+  /**
+   * <p>
+   * This loader will first attempt to load resources from ZooKeeper, but if not found
+   * will delegate to the context classloader when possible,
+   * otherwise it will attempt to resolve resources using any jar files found in
+   * the "lib/" directory in the specified instance directory.
+   * <p>
+   */
+  public ZkSolrResourceLoader(String instanceDir, String collection, ClassLoader parent,
+      Properties coreProperties, ZkController zooKeeperController) {
+    super(instanceDir, parent, coreProperties);
+    this.zkController = zooKeeperController;
+    collectionZkPath = ZkController.CONFIGS_ZKNODE + "/" + collection;
+  }
+
+  /**
+   * Opens any resource by its name. By default, this will look in multiple
+   * locations to load the resource: $configDir/$resource from ZooKeeper.
+   * It will look for it in any jar
+   * accessible through the class loader if it cannot be found in ZooKeeper. 
+   * Override this method to customize loading resources.
+   * 
+   * @return the stream for the named resource
+   */
+  public InputStream openResource(String resource) {
+    InputStream is = null;
+    String file = collectionZkPath + "/" + resource;
+    try {
+      if (zkController.pathExists(file)) {
+        byte[] bytes = zkController.getZkClient().getData(collectionZkPath + "/" + resource, null, null);
+        return new ByteArrayInputStream(bytes);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Error opening " + file, e);
+    }
+    try {
+      // delegate to the class loader (looking into $INSTANCE_DIR/lib jars)
+      is = classLoader.getResourceAsStream(resource);
+    } catch (Exception e) {
+      throw new RuntimeException("Error opening " + resource, e);
+    }
+    if (is == null) {
+      throw new RuntimeException("Can't find resource '" + resource
+          + "' in classpath or '" + collectionZkPath + "', cwd="
+          + System.getProperty("user.dir"));
+    }
+    return is;
+  }
+
+  public String getConfigDir() {
+    throw new ZooKeeperException(
+        ErrorCode.SERVER_ERROR,
+        "ZkSolrResourceLoader does not support getConfigDir() - likely, what you are trying to do is not supported in ZooKeeper mode");
+  }
+  
+  public String[] listConfigDir() {
+    List<String> list;
+    try {
+      list = zkController.getZkClient().getChildren(collectionZkPath, null);
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+      log.error("", e);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+          "", e);
+    } catch (KeeperException e) {
+      log.error("", e);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+          "", e);
+    }
+    return list.toArray(new String[0]);
+  }
+  
+}

Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreContainer.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreContainer.java Wed Oct 13 17:01:13 2010
@@ -21,6 +21,7 @@ import java.io.*;
 import java.nio.channels.FileChannel;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
 import java.text.SimpleDateFormat;
 
 import org.slf4j.Logger;
@@ -31,13 +32,19 @@ import javax.xml.xpath.XPathConstants;
 import javax.xml.xpath.XPath;
 import javax.xml.xpath.XPathExpressionException;
 
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.SolrZkServer;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkSolrResourceLoader;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.DOMUtil;
 import org.apache.solr.common.util.XML;
 import org.apache.solr.common.util.FileUtils;
 import org.apache.solr.handler.admin.CoreAdminHandler;
 import org.apache.solr.schema.IndexSchema;
+import org.apache.zookeeper.KeeperException;
 import org.apache.commons.io.IOUtils;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
@@ -58,6 +65,9 @@ public class CoreContainer 
   protected boolean persistent = false;
   protected String adminPath = null;
   protected String managementPath = null;
+  protected String hostPort;
+  protected String hostContext;
+  protected String host;
   protected CoreAdminHandler coreAdminHandler = null;
   protected File configFile = null;
   protected String libDir = null;
@@ -68,12 +78,86 @@ public class CoreContainer 
   protected String adminHandler;
   protected boolean shareSchema;
   protected String solrHome;
+  @Deprecated
   protected String solrConfigFilenameOverride;
   private String defaultCoreName = "";
+  private ZkController zkController;
+  private SolrZkServer zkServer;
+
+  private String zkHost;
   
   public CoreContainer() {
     solrHome = SolrResourceLoader.locateSolrHome();
   }
+  
+  private void initZooKeeper(String zkHost, int zkClientTimeout) {
+    // if zkHost sys property is not set, we are not using ZooKeeper
+    String zookeeperHost;
+    if(zkHost == null) {
+      zookeeperHost = System.getProperty("zkHost");
+    } else {
+      zookeeperHost = zkHost;
+    }
+
+    String zkRun = System.getProperty("zkRun");
+
+    if (zkRun == null && zookeeperHost == null)
+        return;  // not in zk mode
+
+    zkServer = new SolrZkServer(zkRun, zookeeperHost, solrHome, hostPort);
+    zkServer.parseConfig();
+    zkServer.start();
+
+    // set client from server config if not already set
+    if (zookeeperHost == null) {
+      zookeeperHost = zkServer.getClientString();
+    }
+
+    int zkClientConnectTimeout = 5000;
+
+    if (zookeeperHost != null) {
+      // we are ZooKeeper enabled
+      try {
+        // If this is an ensemble, allow for a long connect time for other servers to come up
+        if (zkRun != null && zkServer.getServers().size() > 1) {
+          zkClientConnectTimeout = 24 * 60 * 60 * 1000;  // 1 day for embedded ensemble
+          log.info("Zookeeper client=" + zookeeperHost + "  Waiting for a quorum.");
+        } else {
+          log.info("Zookeeper client=" + zookeeperHost);          
+        }
+        zkController = new ZkController(zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext);
+        
+        String confDir = System.getProperty("bootstrap_confdir");
+        if(confDir != null) {
+          File dir = new File(confDir);
+          if(!dir.isDirectory()) {
+            throw new IllegalArgumentException("bootstrap_confdir must be a directory of configuration files");
+          }
+          String confName = System.getProperty(ZkController.COLLECTION_PARAM_PREFIX+ZkController.CONFIGNAME_PROP, "configuration1");
+          zkController.uploadConfigDir(dir, confName);
+        }
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      } catch (TimeoutException e) {
+        log.error("Could not connect to ZooKeeper", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      } catch (IOException e) {
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      } catch (KeeperException e) {
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      }
+    }
+    
+  }
 
   public Properties getContainerProperties() {
     return containerProperties;
@@ -82,6 +166,7 @@ public class CoreContainer 
   // Helper class to initialize the CoreContainer
   public static class Initializer {
     protected String solrConfigFilename = null;
+    protected String dataDir = null; // override datadir for single core mode
 
     /**
      * @deprecated all cores now abort on configuration error regardless of configuration
@@ -106,7 +191,8 @@ public class CoreContainer 
     public String getSolrConfigFilename() {
       return solrConfigFilename;
     }
-   @Deprecated
+
+    @Deprecated
     public void setSolrConfigFilename(String solrConfigFilename) {
       this.solrConfigFilename = solrConfigFilename;
     }
@@ -116,14 +202,16 @@ public class CoreContainer 
         ParserConfigurationException, SAXException {
       CoreContainer cores = null;
       String solrHome = SolrResourceLoader.locateSolrHome();
+      // TODO : fix broken logic confusing solr.xml with solrconfig.xml
       File fconf = new File(solrHome, solrConfigFilename == null ? "solr.xml"
           : solrConfigFilename);
       log.info("looking for solr.xml: " + fconf.getAbsolutePath());
       cores = new CoreContainer();
-      cores.solrConfigFilenameOverride = solrConfigFilename;
+      
       if (fconf.exists()) {
         cores.load(solrHome, fconf);
       } else {
+        log.info("no solr.xml file found - using default");
         cores.load(solrHome, new ByteArrayInputStream(DEF_SOLR_XML.getBytes()));
         cores.configFile = fconf;
       }
@@ -219,15 +307,29 @@ public class CoreContainer 
       if(dcoreName != null) {
         defaultCoreName = dcoreName;
       }
-      persistent = cfg.getBool( "solr/@persistent", false );
-      libDir     = cfg.get(     "solr/@sharedLib", null);
-      adminPath  = cfg.get(     "solr/cores/@adminPath", null );
-      shareSchema = cfg.getBool("solr/cores/@shareSchema", false );
+      persistent = cfg.getBool("solr/@persistent", false);
+      libDir = cfg.get("solr/@sharedLib", null);
+      zkHost = cfg.get("solr/@zkHost" , null);
+      adminPath = cfg.get("solr/cores/@adminPath", null);
+      shareSchema = cfg.getBool("solr/cores/@shareSchema", false);
+      int zkClientTimeout = cfg.getInt("solr/cores/@zkClientTimeout", 10000);
+
+      hostPort = System.getProperty("hostPort");
+      if (hostPort == null) {
+        hostPort = cfg.get("solr/cores/@hostPort", "8983");
+      }
+
+      hostContext = cfg.get("solr/cores/@hostContext", "solr");
+      host = cfg.get("solr/cores/@host", null);
+
       if(shareSchema){
         indexSchemaCache = new ConcurrentHashMap<String ,IndexSchema>();
       }
       adminHandler  = cfg.get("solr/cores/@adminHandler", null );
       managementPath  = cfg.get("solr/cores/@managementPath", null );
+      
+      zkClientTimeout = Integer.parseInt(System.getProperty("zkClientTimeout", Integer.toString(zkClientTimeout)));
+      initZooKeeper(zkHost, zkClientTimeout);
 
       if (libDir != null) {
         File f = FileUtils.resolvePath(new File(dir), libDir);
@@ -249,25 +351,9 @@ public class CoreContainer 
         SolrConfig.severeErrors.add(e);
         SolrException.logOnce(log,null,e);
       }
-      
-      // before looping over each core, let's check the names and fail 
-      // fast if the same one is reused multiple times.
-      { // local scope, won't need these vars again
-        NodeList nodes = (NodeList)cfg.evaluate("solr/cores/core/@name", 
-                                                XPathConstants.NODESET);
-        Set<String> names = new HashSet<String>();
-        for (int i=0; i<nodes.getLength(); i++) {
-          String name = DOMUtil.getText(nodes.item(i));
-          if (names.contains(name)) {
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                                    "Multiple cores found with same name: " + 
-                                    name);
-          }
-          names.add(name);
-        }
-      }
 
       NodeList nodes = (NodeList)cfg.evaluate("solr/cores/core", XPathConstants.NODESET);
+      boolean defaultCoreFound = false;
       for (int i=0; i<nodes.getLength(); i++) {
         Node node = nodes.item(i);
         try {
@@ -282,12 +368,12 @@ public class CoreContainer 
             // be mapped to this.
             name="";
           }
-
           CoreDescriptor p = new CoreDescriptor(this, name, DOMUtil.getAttr(node, "instanceDir", null));
 
           // deal with optional settings
           String opt = DOMUtil.getAttr(node, "config", null);
-          if(solrConfigFilenameOverride != null && name.equals("")) {
+
+          if(solrConfigFilenameOverride != null) {
             p.setConfigName(solrConfigFilenameOverride);
           } else if (opt != null) {
             p.setConfigName(opt);
@@ -296,6 +382,16 @@ public class CoreContainer 
           if (opt != null) {
             p.setSchemaName(opt);
           }
+          if (zkController != null) {
+            opt = DOMUtil.getAttr(node, "shard", null);
+            if (opt != null && opt.length() > 0) {
+              p.getCloudDescriptor().setShardId(opt);
+            }
+            opt = DOMUtil.getAttr(node, "collection", null);
+            if (opt != null) {
+              p.getCloudDescriptor().setCollectionName(opt);
+            }
+          }
           opt = DOMUtil.getAttr(node, "properties", null);
           if (opt != null) {
             p.setPropertiesName(opt);
@@ -315,13 +411,35 @@ public class CoreContainer 
           SolrException.logOnce(log,null,ex);
         }
       }
-    }
-
-    finally {
+    } finally {
       if (cfgis != null) {
         try { cfgis.close(); } catch (Exception xany) {}
       }
     }
+    
+    
+    if(zkController != null) {
+      try {
+        synchronized (zkController.getZkStateReader().getUpdateLock()) {
+          zkController.getZkStateReader().makeShardZkNodeWatches(false);
+          zkController.getZkStateReader().updateCloudState(true);
+        }
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      } catch (KeeperException e) {
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      } catch (IOException e) {
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      }
+    }
   }
 
   private Properties readProperties(Config cfg, Node node) throws XPathExpressionException {
@@ -346,6 +464,12 @@ public class CoreContainer 
         }
         cores.clear();
       } finally {
+        if(zkController != null) {
+          zkController.close();
+        }
+        if (zkServer != null) {
+          zkServer.stop();
+        }
         isShutDown = true;
       }
     }
@@ -385,6 +509,24 @@ public class CoreContainer 
       core.setName(name);
     }
 
+    if (zkController != null) {
+      try {
+        zkController.register(core.getName(), core.getCoreDescriptor().getCloudDescriptor(), true);
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      } catch (KeeperException e) {
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      } catch (IOException e) {
+        log.error("", e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+      }
+    }
 
     if( old == null || old == core) {
       log.info( "registering core: "+name );
@@ -427,32 +569,88 @@ public class CoreContainer 
     String instanceDir = idir.getPath();
     
     // Initialize the solr config
-    SolrResourceLoader solrLoader = new SolrResourceLoader(instanceDir, libLoader, getCoreProps(instanceDir, dcore.getPropertiesName(),dcore.getCoreProperties()));
-    SolrConfig config = new SolrConfig(solrLoader, dcore.getConfigName(), null);
-
+    SolrResourceLoader solrLoader = null;
+    
+    SolrConfig config = null;
+    String zkConfigName = null;
+    if(zkController == null) {
+      solrLoader = new SolrResourceLoader(instanceDir, libLoader, getCoreProps(instanceDir, dcore.getPropertiesName(),dcore.getCoreProperties()));
+      config = new SolrConfig(solrLoader, dcore.getConfigName(), null);
+    } else {
+      try {
+        String collection = dcore.getCloudDescriptor().getCollectionName();
+        zkController.createCollectionZkNode(dcore.getCloudDescriptor());
+        // zkController.createCollectionZkNode(collection);
+        zkConfigName = zkController.readConfigName(collection);
+        if (zkConfigName == null) {
+          log.error("Could not find config name for collection:" + collection);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "Could not find config name for collection:" + collection);
+        }
+        solrLoader = new ZkSolrResourceLoader(instanceDir, zkConfigName, libLoader, getCoreProps(instanceDir, dcore.getPropertiesName(),dcore.getCoreProperties()), zkController);
+        config = getSolrConfigFromZk(zkConfigName, dcore.getConfigName(), solrLoader);
+      } catch (KeeperException e) {
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      }
+    }
+    
     IndexSchema schema = null;
-    if(indexSchemaCache != null){
-      //schema sharing is enabled. so check if it already is loaded
-      File schemaFile = new File(dcore.getSchemaName());
-      if (!schemaFile.isAbsolute()) {
-        schemaFile = new File(solrLoader.getInstanceDir() + "conf" + File.separator + dcore.getSchemaName());
-      }
-      if(schemaFile. exists()){
-        String key = schemaFile.getAbsolutePath()+":"+new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(new Date(schemaFile.lastModified()));
-        schema = indexSchemaCache.get(key);
-        if(schema == null){
-          log.info("creating new schema object for core: " + dcore.name);
-          schema = new IndexSchema(config, dcore.getSchemaName(), null);
-          indexSchemaCache.put(key,schema);
-        } else {
-          log.info("re-using schema object for core: " + dcore.name);
+    if (indexSchemaCache != null) {
+      if (zkController != null) {
+        File schemaFile = new File(dcore.getSchemaName());
+        if (!schemaFile.isAbsolute()) {
+          schemaFile = new File(solrLoader.getInstanceDir() + "conf"
+              + File.separator + dcore.getSchemaName());
+        }
+        if (schemaFile.exists()) {
+          String key = schemaFile.getAbsolutePath()
+              + ":"
+              + new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(new Date(
+                  schemaFile.lastModified()));
+          schema = indexSchemaCache.get(key);
+          if (schema == null) {
+            log.info("creating new schema object for core: " + dcore.name);
+            schema = new IndexSchema(config, dcore.getSchemaName(), null);
+            indexSchemaCache.put(key, schema);
+          } else {
+            log.info("re-using schema object for core: " + dcore.name);
+          }
         }
+      } else {
+        // TODO: handle caching from ZooKeeper - perhaps using ZooKeepers versioning
+        // Don't like this cache though - how does it empty as last modified changes?
       }
     }
     if(schema == null){
-      schema = new IndexSchema(config, dcore.getSchemaName(), null);
+      if(zkController != null) {
+        try {
+          schema = getSchemaFromZk(zkConfigName, dcore.getSchemaName(), config, solrLoader);
+        } catch (KeeperException e) {
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        } catch (InterruptedException e) {
+          // Restore the interrupted status
+          Thread.currentThread().interrupt();
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        }
+      } else {
+        schema = new IndexSchema(config, dcore.getSchemaName(), null);
+      }
     }
-    SolrCore core = new SolrCore(dcore.getName(), null, config, schema, dcore);
+    String dataDir = null;
+
+    SolrCore core = new SolrCore(dcore.getName(), dataDir, config, schema, dcore);
     return core;
   }
     
@@ -744,6 +942,18 @@ public class CoreContainer 
     }
     opt = dcore.dataDir;
     if (opt != null) writeAttribute(w,"dataDir",opt);
+
+    CloudDescriptor cd = dcore.getCloudDescriptor();
+    if (cd != null) {
+      opt = cd.getShardId();
+      if (opt != null)
+        writeAttribute(w,"shard",opt);
+      // only write out the collection name if it's not the default (the core name)
+      opt = cd.getCollectionName();
+      if (opt != null && !opt.equals(dcore.name))
+        writeAttribute(w,"collection",opt);
+    }
+
     if (dcore.getCoreProperties() == null || dcore.getCoreProperties().isEmpty())
       w.write("/>\n"); // core
     else  {
@@ -801,6 +1011,37 @@ public class CoreContainer 
   public String getSolrHome() {
     return solrHome;
   }
+  
+  public boolean isZooKeeperAware() {
+    return zkController != null;
+  }
+  
+  public ZkController getZkController() {
+    return zkController;
+  }
+  
+  private SolrConfig getSolrConfigFromZk(String zkConfigName, String solrConfigFileName,
+      SolrResourceLoader resourceLoader) throws IOException,
+      ParserConfigurationException, SAXException, KeeperException,
+      InterruptedException {
+    byte[] config = zkController.getConfigFileData(zkConfigName, solrConfigFileName);
+    InputStream is = new ByteArrayInputStream(config);
+    SolrConfig cfg = solrConfigFileName == null ? new SolrConfig(
+        resourceLoader, SolrConfig.DEFAULT_CONF_FILE, is) : new SolrConfig(
+        resourceLoader, solrConfigFileName, is);
+
+    return cfg;
+  }
+  
+  private IndexSchema getSchemaFromZk(String zkConfigName, String schemaName,
+      SolrConfig config, SolrResourceLoader resourceLoader)
+      throws KeeperException, InterruptedException {
+    byte[] configBytes = zkController.getConfigFileData(zkConfigName, schemaName);
+    InputStream is = new ByteArrayInputStream(configBytes);
+    IndexSchema schema = new IndexSchema(config, schemaName, is);
+    return schema;
+  }
+  
   private static final String DEF_SOLR_XML ="<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n" +
           "<solr persistent=\"false\">\n" +
           "  <cores adminPath=\"/admin/cores\" defaultCoreName=\"" + DEFAULT_DEFAULT_CORE_NAME + "\">\n" +

Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreDescriptor.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreDescriptor.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreDescriptor.java Wed Oct 13 17:01:13 2010
@@ -20,6 +20,8 @@ package org.apache.solr.core;
 import java.util.Properties;
 import java.io.File;
 
+import org.apache.solr.cloud.CloudDescriptor;
+
 /**
  * A Solr core descriptor
  *
@@ -34,10 +36,20 @@ public class CoreDescriptor {
   protected String schemaName;
   private final CoreContainer coreContainer;
   private Properties coreProperties;
+  
+  private CloudDescriptor cloudDesc;
 
   public CoreDescriptor(CoreContainer coreContainer, String name, String instanceDir) {
     this.coreContainer = coreContainer;
     this.name = name;
+    
+    if(coreContainer.getZkController() != null) {
+      this.cloudDesc = new CloudDescriptor();
+      // cloud collection defaults to core name
+      cloudDesc.setCollectionName(name == "" ? coreContainer.getDefaultCoreName() : name);
+      this.cloudDesc.setShardId(coreContainer.getZkController().getNodeName() + "_" + name);
+    }
+    
     if (name == null) {
       throw new RuntimeException("Core needs a name");
     }
@@ -112,6 +124,10 @@ public class CoreDescriptor {
     // normalize zero length to null.
     if (dataDir != null && dataDir.length()==0) dataDir=null;
   }
+  
+  public boolean usingDefaultDataDir() {
+    return this.dataDir == null;
+  }
 
   /**@return the core instance directory. */
   public String getInstanceDir() {
@@ -171,4 +187,12 @@ public class CoreDescriptor {
         this.coreProperties.putAll(coreProperties);
     }
   }
+
+  public CloudDescriptor getCloudDescriptor() {
+    return cloudDesc;
+  }
+  
+  public void setCloudDescriptor(CloudDescriptor cloudDesc) {
+    this.cloudDesc = cloudDesc;
+  }
 }

Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrCore.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrCore.java Wed Oct 13 17:01:13 2010
@@ -512,7 +512,8 @@ public final class SolrCore implements S
     this.setName( name );
     resourceLoader = config.getResourceLoader();
     if (dataDir == null){
-      dataDir =  config.getDataDir();
+      // nocommit: why did solrconfig override core descriptor !?
+      if(cd.usingDefaultDataDir()) dataDir = config.getDataDir();
       if(dataDir == null) dataDir = cd.getDataDir();
     }
 
@@ -1556,12 +1557,10 @@ public final class SolrCore implements S
         
         // Hide everything...
         Set<String> hide = new HashSet<String>();
-        File configdir = new File( solrConfig.getResourceLoader().getConfigDir() ); 
-        if( configdir.exists() && configdir.isDirectory() ) {
-          for( String file : configdir.list() ) {
-            hide.add( file.toUpperCase(Locale.ENGLISH) );
-          }
-        }
+
+        for (String file : solrConfig.getResourceLoader().listConfigDir()) {
+          hide.add(file.toUpperCase(Locale.ENGLISH));
+        }    
         
         // except the "gettable" list
         StringTokenizer st = new StringTokenizer( gettable );
@@ -1588,16 +1587,7 @@ public final class SolrCore implements S
           "solrconfig.xml uses deprecated <bool name='facet.sort'>. Please "+
           "update your config to use <string name='facet.sort'>.");
     }
-
-    if (!solrConfig.getBool("abortOnConfigurationError",true))
-      throw new SolrException(ErrorCode.SERVER_ERROR,
-                              "Setting abortOnConfigurationError==false is no longer supported");
-    if (null != solrConfig.getVal("abortOnConfigurationError", false))
-      log.warn("The abortOnConfigurationError option is no longer supported "+
-               "in solrconfig.xml.  Setting it has no effect.");
-    
-  }
-  
+  } 
 
   public CoreDescriptor getCoreDescriptor() {
     return coreDescriptor;

Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrResourceLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrResourceLoader.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrResourceLoader.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrResourceLoader.java Wed Oct 13 17:01:13 2010
@@ -69,7 +69,7 @@ public class SolrResourceLoader implemen
   static final String base = "org.apache" + "." + project;
   static final String[] packages = {"","analysis.","schema.","handler.","search.","update.","core.","response.","request.","update.processor.","util.", "spelling.", "handler.component.", "handler.dataimport." };
 
-  private URLClassLoader classLoader;
+  protected URLClassLoader classLoader;
   private final String instanceDir;
   private String dataDir;
   
@@ -205,6 +205,15 @@ public class SolrResourceLoader implemen
   public  static String normalizeDir(String path) {
     return ( path != null && (!(path.endsWith("/") || path.endsWith("\\"))) )? path + File.separator : path;
   }
+  
+  public String[] listConfigDir() {
+    File configdir = new File(getConfigDir());
+    if( configdir.exists() && configdir.isDirectory() ) {
+      return configdir.list();
+    } else {
+      return new String[0];
+    }
+  }
 
   public String getConfigDir() {
     return instanceDir + "conf/";

Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Wed Oct 13 17:01:13 2010
@@ -17,6 +17,7 @@
 
 package org.apache.solr.handler.admin;
 
+import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
@@ -228,7 +229,14 @@ public class CoreAdminHandler extends Re
     try {
       SolrParams params = req.getParams();
       String name = params.get(CoreAdminParams.NAME);
-      CoreDescriptor dcore = new CoreDescriptor(coreContainer, name, params.get(CoreAdminParams.INSTANCE_DIR));
+
+      String instanceDir = params.get(CoreAdminParams.INSTANCE_DIR);
+      if (instanceDir == null) {
+        // instanceDir = coreContainer.getSolrHome() + "/" + name;
+        instanceDir = name; // bare name is already relative to solr home
+      }
+
+      CoreDescriptor dcore = new CoreDescriptor(coreContainer, name, instanceDir);
 
       //  fillup optional parameters
       String opts = params.get(CoreAdminParams.CONFIG);
@@ -243,6 +251,19 @@ public class CoreAdminHandler extends Re
       if (opts != null)
         dcore.setDataDir(opts);
 
+      CloudDescriptor cd = dcore.getCloudDescriptor();
+      if (cd != null) {
+        cd.setParams(req.getParams());
+
+        opts = params.get(CoreAdminParams.COLLECTION);
+        if (opts != null)
+          cd.setCollectionName(opts);
+        
+        opts = params.get(CoreAdminParams.SHARD);
+        if (opts != null)
+          cd.setShardId(opts);
+      }
+
       dcore.setCoreProperties(null);
       SolrCore core = coreContainer.create(dcore);
       coreContainer.register(name, core, false);

Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryComponent.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryComponent.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryComponent.java Wed Oct 13 17:01:13 2010
@@ -22,12 +22,19 @@ import org.apache.lucene.index.Term;
 import org.apache.lucene.queryParser.ParseException;
 import org.apache.lucene.search.*;
 import org.apache.lucene.util.BytesRef;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.*;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.schema.FieldType;
@@ -110,11 +117,116 @@ public class QueryComponent extends Sear
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
     }
 
-    // TODO: temporary... this should go in a different component.
+    checkDistributed(rb);
+  }
+
+
+  // TODO: this could go in a different component, or in SearchHandler
+  // check if this is a distributed request and set info on the response builder
+  void checkDistributed(ResponseBuilder rb) {
+    SolrQueryRequest req = rb.req;
+    SolrParams params = req.getParams();
+
+    rb.isDistrib = params.getBool("distrib",false);
     String shards = params.get(ShardParams.SHARDS);
-    if (shards != null) {
-      List<String> lst = StrUtils.splitSmart(shards, ",", true);
-      rb.shards = lst.toArray(new String[lst.size()]);
+
+    // for back compat, a shards param with URLs like localhost:8983/solr will mean that this
+    // search is distributed.
+    boolean hasShardURL = shards != null && shards.indexOf('/') > 0;
+    rb.isDistrib = hasShardURL | rb.isDistrib;
+
+    if (rb.isDistrib) {
+      // since the cost of grabbing cloud state is still up in the air, we grab it only
+      // if we need it.
+      CloudState cloudState = null;
+      Map<String,Slice> slices = null;
+      CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor();
+      CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor();
+      ZkController zkController = coreDescriptor.getCoreContainer().getZkController();
+
+
+      if (shards != null) {
+        List<String> lst = StrUtils.splitSmart(shards, ",", true);
+        rb.shards = lst.toArray(new String[lst.size()]);
+        rb.slices = new String[rb.shards.length];
+
+        if (zkController != null) {
+          // figure out which shards are slices
+          for (int i=0; i<rb.shards.length; i++) {
+            if (rb.shards[i].indexOf('/') < 0) {
+              // this is a logical shard
+              rb.slices[i] = rb.shards[i];
+              rb.shards[i] = null;
+            }
+          }
+        }
+      } else if (zkController != null) {
+        // we weren't provided with a list of slices to query, so find the list that will cover the complete index
+
+        cloudState =  zkController.getCloudState();
+
+        // TODO: check "collection" for which collection(s) to search.. but for now, just default
+        // to the collection for this core.
+        // This can be more efficient... we only record the name, even though we have the
+        // shard info we need in the next step of mapping slice->shards
+        slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
+        rb.slices = slices.keySet().toArray(new String[slices.size()]);
+        rb.shards = new String[rb.slices.length];
+        
+        /***
+         rb.slices = new String[slices.size()];
+         for (int i=0; i<rb.slices.length; i++) {
+         rb.slices[i] = slices.get(i).getName();
+         }
+         ***/
+      }
+
+      //
+      // Map slices to shards
+      //
+      if (zkController != null) {
+        for (int i=0; i<rb.shards.length; i++) {
+          if (rb.shards[i] == null) {
+            if (cloudState == null) {
+              cloudState =  zkController.getCloudState();
+              slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
+            }
+            String sliceName = rb.slices[i];
+
+            Slice slice = slices.get(sliceName);
+
+            if (slice==null) {
+              // Treat this the same as "all servers down" for a slice, and let things continue
+              // if partial results are acceptable
+              rb.shards[i] = "";
+              continue;
+              // throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "no such shard: " + sliceName);
+            }
+
+            Map<String, ZkNodeProps> sliceShards = slice.getShards();
+
+            // For now, recreate the | delimited list of equivalent servers
+            Set<String> liveNodes = cloudState.getLiveNodes();
+            StringBuilder sliceShardsStr = new StringBuilder();
+            boolean first = true;
+            for (ZkNodeProps nodeProps : sliceShards.values()) {
+              if (!liveNodes.contains(nodeProps.get(ZkStateReader.NODE_NAME)))
+                continue;
+              if (first) {
+                first = false;
+              } else {
+                sliceShardsStr.append('|');
+              }
+              String url = nodeProps.get("url");
+              if (url.startsWith("http://"))
+                url = url.substring(7);
+              sliceShardsStr.append(url);
+            }
+
+            rb.shards[i] = sliceShardsStr.toString();
+          }
+        }
+      }
     }
     String shards_rows = params.get(ShardParams.SHARDS_ROWS);
     if(shards_rows != null) {

Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryElevationComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryElevationComponent.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryElevationComponent.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryElevationComponent.java Wed Oct 13 17:01:13 2010
@@ -32,7 +32,6 @@ import java.util.WeakHashMap;
 
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.QueryElevationParams;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +47,7 @@ import org.apache.lucene.index.IndexRead
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.*;
 import org.apache.lucene.util.StringHelper;
+import org.apache.solr.cloud.ZkController;
 import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.SolrParams;
@@ -172,19 +172,30 @@ public class QueryElevationComponent ext
               "QueryElevationComponent must specify argument: '"+CONFIG_FILE
               +"' -- path to elevate.xml" );
         }
-        File fC = new File( core.getResourceLoader().getConfigDir(), f );
-        File fD = new File( core.getDataDir(), f );
-        if( fC.exists() == fD.exists() ) {
-          throw new SolrException( SolrException.ErrorCode.SERVER_ERROR,
-              "QueryElevationComponent missing config file: '"+f + "\n"
-              +"either: "+fC.getAbsolutePath() + " or " + fD.getAbsolutePath() + " must exist, but not both." );
-        }
-        if( fC.exists() ) {
-          log.info( "Loading QueryElevation from: "+fC.getAbsolutePath() );
-          Config cfg = new Config( core.getResourceLoader(), f );
-          elevationCache.put(null, loadElevationMap( cfg ));
+        boolean exists = false;
+
+        // check if using ZooKeeper
+        ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
+        if(zkController != null) {
+          // TODO : shouldn't have to keep reading the config name when it has been read before
+          exists = zkController.configFileExists(zkController.readConfigName(core.getCoreDescriptor().getCloudDescriptor().getCollectionName()), f);
+        } else {
+          File fC = new File( core.getResourceLoader().getConfigDir(), f );
+          File fD = new File( core.getDataDir(), f );
+          if( fC.exists() == fD.exists() ) {
+            throw new SolrException( SolrException.ErrorCode.SERVER_ERROR,
+                "QueryElevationComponent missing config file: '"+f + "\n"
+                +"either: "+fC.getAbsolutePath() + " or " + fD.getAbsolutePath() + " must exist, but not both." );
+          }
+          if( fC.exists() ) {
+            exists = true;
+            log.info( "Loading QueryElevation from: "+ fC.getAbsolutePath() );
+            Config cfg = new Config( core.getResourceLoader(), f );
+            elevationCache.put(null, loadElevationMap( cfg ));
+          } 
         }
-        else {
+        
+        if (!exists){
           // preload the first data
           RefCounted<SolrIndexSearcher> searchHolder = null;
           try {

Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ResponseBuilder.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ResponseBuilder.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ResponseBuilder.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ResponseBuilder.java Wed Oct 13 17:01:13 2010
@@ -26,15 +26,11 @@ import org.apache.solr.request.SolrQuery
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.search.DocListAndSet;
 import org.apache.solr.search.QParser;
-import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.search.SortSpec;
-import org.apache.solr.util.SolrPluginUtils;
+import org.apache.solr.search.SolrIndexSearcher;
 
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * This class is experimental and will be changing in the future.
@@ -42,9 +38,8 @@ import java.util.Set;
  * @version $Id$
  * @since solr 1.3
  */
-public class ResponseBuilder {
-
-
+public class ResponseBuilder
+{
   public SolrQueryRequest req;
   public SolrQueryResponse rsp;
   public boolean doHighlights;
@@ -101,7 +96,9 @@ public class ResponseBuilder {
   public int stage;  // What stage is this current request at?
 
   //The address of the Shard
+  boolean isDistrib; // is this a distributed search?
   public String[] shards;
+  public String[] slices; // the optional logical ids of the shards
   public int shards_rows = -1;
   public int shards_start = -1;
   public List<ShardRequest> outgoing;  // requests to be sent

Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/SearchHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/SearchHandler.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/SearchHandler.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/SearchHandler.java Wed Oct 13 17:01:13 2010
@@ -17,32 +17,53 @@
 
 package org.apache.solr.handler.component;
 
-import org.apache.solr.handler.RequestHandlerBase;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.RTimer;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.lucene.queryParser.ParseException;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.RTimer;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.RequestHandlerBase;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.client.solrj.SolrServer;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
-
 import org.apache.solr.util.SolrPluginUtils;
 import org.apache.solr.util.plugin.SolrCoreAware;
-import org.apache.solr.core.SolrCore;
-import org.apache.lucene.queryParser.ParseException;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.HttpClient;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.concurrent.*;
 
 /**
  *
@@ -199,7 +220,7 @@ public class SearchHandler extends Reque
       subt.stop();
     }
 
-    if (rb.shards == null) {
+    if (!rb.isDistrib) {
       // a normal non-distributed request
 
       // The semantics of debugging vs not debugging are different enough that
@@ -265,6 +286,7 @@ public class SearchHandler extends Reque
             for (String shard : sreq.actualShards) {
               ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
               params.remove(ShardParams.SHARDS);      // not a top-level request
+              params.remove("distrib");               // not a top-level request
               params.remove("indent");
               params.remove(CommonParams.HEADER_ECHO_PARAMS);
               params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request
@@ -367,6 +389,8 @@ class HttpCommComponent {
 
 
   static HttpClient client;
+  static Random r = new Random();
+  static LBHttpSolrServer loadbalancer;
 
   static {
     MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
@@ -375,12 +399,29 @@ class HttpCommComponent {
     mgr.getParams().setConnectionTimeout(SearchHandler.connectionTimeout);
     mgr.getParams().setSoTimeout(SearchHandler.soTimeout);
     // mgr.getParams().setStaleCheckingEnabled(false);
-    client = new HttpClient(mgr);    
+
+    client = new HttpClient(mgr);
+
+    // prevent retries  (note: this didn't work when set on mgr.. needed to be set on client)
+    DefaultHttpMethodRetryHandler retryhandler = new DefaultHttpMethodRetryHandler(0, false);
+    client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, retryhandler);
+
+    try {
+      loadbalancer = new LBHttpSolrServer(client);
+    } catch (MalformedURLException e) {
+      // should be impossible since we're not passing any URLs here
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,e);
+    }
   }
 
   CompletionService<ShardResponse> completionService = new ExecutorCompletionService<ShardResponse>(commExecutor);
   Set<Future<ShardResponse>> pending = new HashSet<Future<ShardResponse>>();
 
+  // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
+  // This is primarily to keep track of what order we should use to query the replicas of a shard
+  // so that we use the same replica for all phases of a distributed request.
+  Map<String,List<String>> shardToURLs = new HashMap<String,List<String>>();
+
   HttpCommComponent() {
   }
 
@@ -404,7 +445,36 @@ class HttpCommComponent {
     }
   }
 
+
+  // Not thread safe... don't use in Callable.
+  // Don't modify the returned URL list.
+  private List<String> getURLs(String shard) {
+    List<String> urls = shardToURLs.get(shard);
+    if (urls==null) {
+      urls = StrUtils.splitSmart(shard,"|",true);
+
+      // convert shard to URL
+      for (int i=0; i<urls.size(); i++) {
+        urls.set(i, SearchHandler.scheme + urls.get(i));
+      }
+
+      //
+      // Shuffle the list instead of use round-robin by default.
+      // This prevents accidental synchronization where multiple shards could get in sync
+      // and query the same replica at the same time.
+      //
+      if (urls.size() > 1)
+        Collections.shuffle(urls, r);
+      shardToURLs.put(shard, urls);
+    }
+    return urls;
+  }
+
+
   void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
+    // do this outside of the callable for thread safety reasons
+    final List<String> urls = getURLs(shard);
+
     Callable<ShardResponse> task = new Callable<ShardResponse>() {
       public ShardResponse call() throws Exception {
 
@@ -416,13 +486,9 @@ class HttpCommComponent {
         long startTime = System.currentTimeMillis();
 
         try {
-          // String url = "http://" + shard + "/select";
-          String url = SearchHandler.scheme + shard;
-
           params.remove(CommonParams.WT); // use default (currently javabin)
           params.remove(CommonParams.VERSION);
 
-          SolrServer server = new CommonsHttpSolrServer(url, client);
           // SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select");
           // use generic request to avoid extra processing of queries
           QueryRequest req = new QueryRequest(params);
@@ -430,10 +496,24 @@ class HttpCommComponent {
 
           // no need to set the response parser as binary is the default
           // req.setResponseParser(new BinaryResponseParser());
-          // srsp.rsp = server.request(req);
-          // srsp.rsp = server.query(sreq.params);
 
-          ssr.nl = server.request(req);
+          // if there are no shards available for a slice, urls.size()==0
+          if (urls.size()==0) {
+            // TODO: what's the right error code here? We should use the same thing when
+            // all of the servers for a shard are down.
+            throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
+          }
+
+          if (urls.size() <= 1) {
+            String url = urls.get(0);
+            srsp.setShardAddress(url);
+            SolrServer server = new CommonsHttpSolrServer(url, client);
+            ssr.nl = server.request(req);
+          } else {
+            LBHttpSolrServer.Rsp rsp = loadbalancer.request(new LBHttpSolrServer.Req(req, urls));
+            ssr.nl = rsp.getResponse();
+            srsp.setShardAddress(rsp.getServer());
+          }
         } catch (Throwable th) {
           srsp.setException(th);
           if (th instanceof SolrException) {

Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardRequest.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardRequest.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardRequest.java Wed Oct 13 17:01:13 2010
@@ -41,8 +41,6 @@ public class ShardRequest {
   public int purpose;  // the purpose of this request
 
   public String[] shards;  // the shards this request should be sent to, null for all
-// TODO: how to request a specific shard address?
-
 
   public ModifiableSolrParams params;
 

Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardResponse.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardResponse.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardResponse.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardResponse.java Wed Oct 13 17:01:13 2010
@@ -79,4 +79,9 @@ public final class ShardResponse {
   {
     this.rspCode = rspCode;
   }
+
+  /** What was the shard address that returned this response.  Example:  "http://localhost:8983/solr" */
+  public String getShardAddress() { return this.shardAddress; }
+
+  void setShardAddress(String addr) { this.shardAddress = addr; }
 }

Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/TermsComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/TermsComponent.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/TermsComponent.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/TermsComponent.java Wed Oct 13 17:01:13 2010
@@ -61,6 +61,7 @@ public class TermsComponent extends Sear
     // TODO: temporary... this should go in a different component.
     String shards = params.get(ShardParams.SHARDS);
     if (shards != null) {
+      rb.isDistrib = true;
       if (params.get(ShardParams.SHARDS_QT) == null) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shards.qt parameter specified");
       }
@@ -298,9 +299,6 @@ public class TermsComponent extends Sear
     // base shard request on original parameters
     sreq.params = new ModifiableSolrParams(params);
 
-    // don't pass through the shards param
-    sreq.params.remove(ShardParams.SHARDS);
-
     // remove any limits for shards, we want them to return all possible
     // responses
     // we want this so we can calculate the correct counts
@@ -310,11 +308,6 @@ public class TermsComponent extends Sear
     sreq.params.set(TermsParams.TERMS_LIMIT, -1);
     sreq.params.set(TermsParams.TERMS_SORT, TermsParams.TERMS_SORT_INDEX);
 
-    // TODO: is there a better way to handle this?
-    String qt = params.get(CommonParams.QT);
-    if (qt != null) {
-      sreq.params.add(CommonParams.QT, qt);
-    }
     return sreq;
   }
 

Added: lucene/dev/trunk/solr/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java (added)
+++ lucene/dev/trunk/solr/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,148 @@
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+
+public class CloudSolrServer extends SolrServer {
+  private volatile ZkStateReader zkStateReader;
+  private String zkHost; // the zk server address
+  private int zkConnectTimeout = 10000;
+  private int zkClientTimeout = 10000;
+  private String defaultCollection;
+  private LBHttpSolrServer lbServer;
+  Random rand = new Random();
+
+  /**
+   * @param zkHost The address of the zookeeper quorum containing the cloud state
+   */
+  public CloudSolrServer(String zkHost) throws MalformedURLException {
+      this(zkHost, new LBHttpSolrServer());
+  }
+
+  /**
+   * @param zkHost The address of the zookeeper quorum containing the cloud state
+   */
+  public CloudSolrServer(String zkHost, LBHttpSolrServer lbServer) {
+    this.zkHost = zkHost;
+    this.lbServer = lbServer;
+  }
+
+  /** Sets the default collection for request */
+  public void setDefaultCollection(String collection) {
+    this.defaultCollection = collection;
+  }
+
+  /** Set the connect timeout to the zookeeper ensemble in ms */
+  public void setZkConnectTimeout(int zkConnectTimeout) {
+    this.zkConnectTimeout = zkConnectTimeout;
+  }
+
+  /** Set the timeout to the zookeeper ensemble in ms */
+  public void setZkClientTimeout(int zkClientTimeout) {
+    this.zkClientTimeout = zkClientTimeout;
+  }
+
+  /**
+   * Connect to the zookeeper ensemble.
+   * This is an optional method that may be used to force a connect before any other requests are sent.
+   *
+   * @throws IOException
+   * @throws TimeoutException
+   * @throws InterruptedException
+   */
+  public void connect() {
+    if (zkStateReader != null) return;
+    synchronized(this) {
+      if (zkStateReader != null) return;
+      try {
+        ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout, zkClientTimeout);
+        zk.makeCollectionsNodeWatches();
+        zk.makeShardZkNodeWatches(false);
+        zk.updateCloudState(true);
+        zkStateReader = zk;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+      } catch (KeeperException e) {
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+
+      } catch (IOException e) {
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+
+      } catch (TimeoutException e) {
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+      }
+    }
+  }
+
+
+  @Override
+  public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
+    connect();
+
+    CloudState cloudState = zkStateReader.getCloudState();
+
+    String collection = request.getParams().get("collection", defaultCollection);
+
+    // TODO: allow multiple collections to be specified via comma separated list
+
+    Map<String,Slice> slices = cloudState.getSlices(collection);
+    Set<String> liveNodes = cloudState.getLiveNodes();
+
+    // IDEA: have versions on various things... like a global cloudState version
+    // or shardAddressVersion (which only changes when the shards change)
+    // to allow caching.
+
+    // build a map of unique nodes
+    // TODO: allow filtering by group, role, etc
+    Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();
+    List<String> urlList = new ArrayList<String>();
+    for (Slice slice : slices.values()) {
+      for (ZkNodeProps nodeProps : slice.getShards().values()) {
+        String node = nodeProps.get(ZkStateReader.NODE_NAME);
+        if (!liveNodes.contains(node)) continue;
+        if (nodes.put(node, nodeProps) == null) {
+          String url = nodeProps.get(ZkStateReader.URL_PROP);
+          urlList.add(url);
+        }
+      }
+    }
+
+    Collections.shuffle(urlList, rand);
+    // System.out.println("########################## MAKING REQUEST TO " + urlList);
+    // TODO: set distrib=true if we detected more than one shard?
+    LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(request, urlList);
+    LBHttpSolrServer.Rsp rsp = lbServer.request(req);
+    return rsp.getResponse();
+  }
+
+  public void close() {
+    if (zkStateReader != null) {
+      synchronized(this) {
+        if (zkStateReader!= null)
+          zkStateReader.close();
+        zkStateReader = null;
+      }
+    }
+  }
+}



Mime
View raw message