lucene-solr-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r898977 - in /lucene/solr/branches/cloud/src: java/org/apache/solr/cloud/ test/org/apache/solr/cloud/
Date Wed, 13 Jan 2010 22:29:25 GMT
Author: markrmiller
Date: Wed Jan 13 22:29:24 2010
New Revision: 898977

URL: http://svn.apache.org/viewvc?rev=898977&view=rev
Log:
start some changes - core/shard zknodes now persistent, ephemeral zknodes done at the node
(webapp) level, begin refactor of zk state

Modified:
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java Wed Jan 13 22:29:24
2010
@@ -24,11 +24,11 @@
   private Map<String,CollectionInfo> collectionInfos = new HashMap<String,CollectionInfo>();
   
   //nocommit
-  public synchronized void addCollectionInfo(String collection, CollectionInfo collectionInfo)
{
+  public void addCollectionInfo(String collection, CollectionInfo collectionInfo) {
     collectionInfos.put(collection, collectionInfo);
   }
   
-  public synchronized CollectionInfo getCollectionInfo(String collection) {
+  public CollectionInfo getCollectionInfo(String collection) {
     return collectionInfos.get(collection);
   }
 }

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java Wed Jan
13 22:29:24 2010
@@ -44,22 +44,27 @@
   static final String ROLE_PROP = "role";
   
   // maps shard name to the shard addresses and roles
-  private final Map<String,ShardInfoList> shardNameToShardInfoList;
+  private final Map<String,Properties> shards;
   private final long updateTime;
 
-  public CollectionInfo(Map<String,ShardInfoList> shardNameToShardInfoList) {
+  private final List<String> nodes;
+
+
+
+  public CollectionInfo(Map<String,Properties> shards, List<String> nodes) {
     //nocommit: defensive copy?
-    this.shardNameToShardInfoList = shardNameToShardInfoList;
+    this.shards = shards;
     this.updateTime = System.currentTimeMillis();
+    this.nodes = nodes;
   }
   
-  public CollectionInfo(SolrZkClient client, String path) throws KeeperException, InterruptedException,
IOException {
-    //nocommit: 
-    // build immutable CollectionInfo
-    shardNameToShardInfoList = readShardInfo(client, path);
-    
-    this.updateTime = System.currentTimeMillis();
-  }
+//  public CollectionInfo(SolrZkClient client, String path) throws KeeperException, InterruptedException,
IOException {
+//    //nocommit: 
+//    // build immutable CollectionInfo
+//    shardNameToShardInfoList = readShardInfo(client, path);
+//    nodes = client.getChildren(path, null);
+//    this.updateTime = System.currentTimeMillis();
+//  }
   
   /**
    * Read info on the available Shards and Nodes.
@@ -80,7 +85,7 @@
       throw new IllegalStateException("Cannot find zk shards node that should exist:"
           + path);
     }
-    List<String> nodes = zkClient.getChildren(path, null);
+
 
     for (String zkNodeName : nodes) {
       byte[] data = zkClient.getData(path + "/" + zkNodeName, null,
@@ -120,18 +125,17 @@
    * 
    * @return
    */
-  public List<String> getSearchShards() {
-    List<String> nodeList = new ArrayList<String>();
-    for (ShardInfoList nodes : shardNameToShardInfoList.values()) {
-      nodeList.add(nodes.getShardUrl());
-    }
-    return nodeList;
-  }
-
-  public ShardInfoList getShardInfoList(String shardName) {
-    return shardNameToShardInfoList.get(shardName);
-  }
+//  public List<String> getSearchShards() {
+//    List<String> nodeList = new ArrayList<String>();
+//    for (ShardInfoList nodes : shardNameToShardInfoList.values()) {
+//      nodeList.add(nodes.getShardUrl());
+//    }
+//    return nodeList;
+//  }
   
+  public List<String> getNodes() {
+    return Collections.unmodifiableList(nodes);
+  }
 
   /**
    * @return last time info was updated.

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java Wed Jan
13 22:29:24 2010
@@ -101,32 +101,10 @@
 
       log.info("Connected:" + connected);
     } else if (state == KeeperState.Disconnected) {
-      // nocommit : not sure we have to reconnect like this on disconnect
-      if(connected == false) {
-        // nocommit
-        System.out.println("we already know we are dc'd - why are we notified twice?");
-        return;
-      }
+      // nocommit : not sure we have to reconnect on disconnect
+      // ZooKeeper will recover when it can
       connected = false;
-      // nocommit: start reconnect attempts - problem if this is shutdown related?
 
-      try {
-        connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate()
{
-          @Override
-          public void update(ZooKeeper keeper) throws InterruptedException, TimeoutException,
IOException {
-           waitForConnected(SolrZkClient.CONNECT_TIMEOUT);
-           client.updateKeeper(keeper);
-           if(onReconnect != null) {
-             onReconnect.command();
-           }
-           ConnectionManager.this.connected = true;
-          }
-        });
-      } catch (Exception e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-      
     } else {
       connected = false;
     }

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java Wed Jan 13
22:29:24 2010
@@ -42,10 +42,15 @@
   public void process(WatchedEvent event) {
     // nocommit : this will be called too often as shards register themselves?
     System.out.println("shard node changed");
+   
 
     try {
+      // nocommit:
+      controller.printLayoutToStdOut();
       // nocommit : refresh watcher
-      // controller.getKeeperConnection().exists(event.getPath(), this);
+      
+      // nocommit : rewatch
+      controller.getZkClient().exists(event.getPath(), this);
 
       // TODO: need to load whole state?
       controller.readCloudInfo();

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java Wed Jan 13
22:29:24 2010
@@ -240,6 +240,11 @@
       InterruptedException {
     makePath(path, null, CreateMode.PERSISTENT);
   }
+  
+  public void makePath(String path, CreateMode createMode) throws KeeperException,
+      InterruptedException {
+    makePath(path, null, createMode);
+  }
 
   /**
    * Creates the path in ZooKeeper, creating each node as necessary.

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Wed Jan 13
22:29:24 2010
@@ -26,9 +26,11 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -89,6 +91,8 @@
   private String localHostName;
   private String localHost;
 
+  private String hostName;
+
   /**
    * 
    * @param zkServerAddress ZooKeeper server host address
@@ -142,7 +146,7 @@
    * nocommit: adds nodes if they don't exist, eg /shards/ node. consider race
    * conditions
    */
-  private void addZkShardsNode(String collection) throws IOException {
+  private void addZkCoresNode(String collection) throws IOException {
 
     String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE;
     try {
@@ -309,24 +313,24 @@
   }
 
   // nocommit - testing
-  public String getSearchNodes(String collection) {
-    StringBuilder nodeString = new StringBuilder();
-    boolean first = true;
-    List<String> nodes;
-
-    nodes = cloudInfo.getCollectionInfo(collection).getSearchShards();
-    // nocommit
-    System.out.println("there are " + nodes.size() + " node(s)");
-    for (String node : nodes) {
-      nodeString.append(node);
-      if (first) {
-        first = false;
-      } else {
-        nodeString.append(',');
-      }
-    }
-    return nodeString.toString();
-  }
+//  public String getSearchNodes(String collection) {
+//    StringBuilder nodeString = new StringBuilder();
+//    boolean first = true;
+//    List<String> nodes;
+//
+//    nodes = cloudInfo.getCollectionInfo(collection).getSearchShards();
+//    // nocommit
+//    System.out.println("there are " + nodes.size() + " node(s)");
+//    for (String node : nodes) {
+//      nodeString.append(node);
+//      if (first) {
+//        first = false;
+//      } else {
+//        nodeString.append(',');
+//      }
+//    }
+//    return nodeString.toString();
+//  }
 
   SolrZkClient getZkClient() {
     return zkClient;
@@ -344,8 +348,9 @@
     try {
       localHostName = getHostAddress();
       Matcher m = URL_POST.matcher(localHostName);
+
       if (m.matches()) {
-        String hostName = m.group(1);
+        hostName = m.group(1);
         // register host
         zkClient.makePath(hostName);
       } else {
@@ -354,8 +359,24 @@
             + localHostName);
       }
       
+      // makes nodes node
+      try {
+        zkClient.makePath("/nodes");
+      } catch (KeeperException e) {
+        // its okay if another beats us creating the node
+        if (e.code() != KeeperException.Code.NODEEXISTS) {
+          log.error("ZooKeeper Exception", e);
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+              "ZooKeeper Exception", e);
+        }
+      }
+      String nodeName = hostName + ":" + localHostPort + "_"+ localHostContext;
+      zkClient.makePath("/nodes/" + nodeName, CreateMode.EPHEMERAL);
+      
       // nocommit
       setUpCollectionsNode();
+      
+      
 
     } catch (IOException e) {
       log.error("", e);
@@ -385,7 +406,11 @@
     for (String collection : collections) {
       String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection
           + SHARDS_ZKNODE;
-      CollectionInfo collectionInfo = new CollectionInfo(zkClient, shardsZkPath);
+      
+      List<String> nodes = zkClient.getChildren(shardsZkPath, null);
+      Map<String,Properties> shards = readShardsInfo(collection, shardsZkPath, nodes);
+ 
+      CollectionInfo collectionInfo = new CollectionInfo(shards, nodes);
       cloudInfo.addCollectionInfo(collection, collectionInfo);
     }
 
@@ -470,21 +495,22 @@
    * Read info on the available Shards and Nodes.
    * 
    * @param path to the shards zkNode
+   * @param shardsZkPath 
+   * @param nodeList 
    * @return Map from shard name to a {@link ShardInfoList}
    * @throws InterruptedException
    * @throws KeeperException
    * @throws IOException
    */
-  public Map<String,ShardInfoList> readShardsNode(String path)
+  public Map<String,Properties> readShardsInfo(String collection, String path, List<String>
nodes)
       throws KeeperException, InterruptedException, IOException {
-
-    HashMap<String,ShardInfoList> shardNameToShardList = new HashMap<String,ShardInfoList>();
-
-    if (!zkClient.exists(path)) {
-      throw new IllegalStateException("Cannot find zk node that should exist:"
-          + path);
+    Set<String> nodesSet = new HashSet<String>(nodes.size());
+    nodesSet.addAll(nodes);
+    if(cloudInfo != null) {
+      List<String> oldNodes = cloudInfo.getCollectionInfo(collection).getNodes();
     }
-    List<String> nodes = zkClient.getChildren(path, null);
+    
+    Map<String,Properties> cores = new HashMap<String,Properties>();
 
     for (String zkNodeName : nodes) {
       byte[] data = zkClient.getData(path + "/" + zkNodeName, null, null);
@@ -492,30 +518,11 @@
       Properties props = new Properties();
       props.load(new ByteArrayInputStream(data));
 
-      String url = (String) props.get(CollectionInfo.URL_PROP);
-      String shardNameList = (String) props.get(CollectionInfo.SHARD_LIST_PROP);
-      String[] shardsNames = shardNameList.split(",");
-      for (String shardName : shardsNames) {
-        ShardInfoList sList = shardNameToShardList.get(shardName);
-        List<ShardInfo> shardList;
-        if (sList == null) {
-          shardList = new ArrayList<ShardInfo>(1);
-        } else {
-          List<ShardInfo> oldShards = sList.getShards();
-          shardList = new ArrayList<ShardInfo>(oldShards.size() + 1);
-          shardList.addAll(oldShards);
-        }
-
-        ShardInfo shard = new ShardInfo(url);
-        shardList.add(shard);
-        ShardInfoList list = new ShardInfoList(shardList);
-
-        shardNameToShardList.put(shardName, list);
-      }
+      cores.put(zkNodeName, props);
 
     }
 
-    return Collections.unmodifiableMap(shardNameToShardList);
+    return Collections.unmodifiableMap(cores);
   }
 
   /**
@@ -548,7 +555,7 @@
 
     // build layout if not exists
     // nocommit : consider how we watch shards on all collections
-    addZkShardsNode(collection);
+    addZkCoresNode(collection);
 
     // create node
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -565,8 +572,19 @@
 
     props.store(baos, PROPS_DESC);
 
-    nodePath = zkClient.create(shardsZkPath + CORE_ZKPREFIX,
-        baos.toByteArray(), CreateMode.EPHEMERAL_SEQUENTIAL);
+    String nodeName = hostName + ":" + localHostPort + "_"+ localHostContext + (coreName.length()
== 0 ? "" : "_" + coreName);
+    try {
+      nodePath = zkClient.create(shardsZkPath + "/" + nodeName,
+        baos.toByteArray(), CreateMode.PERSISTENT);
+    } catch (KeeperException e) {
+      // its okay if the node already exists
+      if (e.code() != KeeperException.Code.NODEEXISTS) {
+        throw e;
+      }
+    }
+    
+    // signal that the shards node has changed
+    zkClient.setData(shardsZkPath, (byte[])null);
 
     return nodePath;
   }
@@ -627,8 +645,14 @@
 
       public void process(WatchedEvent event) {
         System.out.println("Collections node event:" + event);
+        // nocommit : if collections node was signaled, look for new collections
         
       }});
+    
+    collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
+    for(String collection : collections) {
+      zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/shards", shardWatcher);
+    }
   }
 
   private void setUpCollectionsNode() throws KeeperException, InterruptedException {

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java Wed Jan 13
22:29:24 2010
@@ -106,8 +106,6 @@
     assertU(delQ("id:[100 TO 110]"));
     assertU(commit());
     assertQ(req("id:[100 TO 110]"), "//*[@numFound='0']");
-    
-    //nocommit
-    System.out.println("search nodes:" + h.getCoreContainer().getZooKeeperController().getSearchNodes("DEFAULT_CORE"));
+
   }
 }

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java Wed Jan
13 22:29:24 2010
@@ -20,6 +20,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -82,35 +83,20 @@
 
       zkController = new ZkController(ZOO_KEEPER_ADDRESS, TIMEOUT,
           "localhost", "8983", "/solr");
-      Map<String,ShardInfoList> shardInfoMap = zkController
-          .readShardsNode(shardsPath);
-      assertTrue(shardInfoMap.size() > 0);
+      zkController.readCloudInfo();
+      CloudInfo cloudInfo = zkController.getCloudInfo();
+      CollectionInfo collectionInfo = cloudInfo.getCollectionInfo("collection1");
+      assertNotNull(collectionInfo);
 
-      Set<Entry<String,ShardInfoList>> entries = shardInfoMap.entrySet();
 
       if (DEBUG) {
-        for (Entry<String,ShardInfoList> entry : entries) {
-          System.out.println("shard:" + entry.getKey() + " value:"
-              + entry.getValue().toString());
+        for (String node : collectionInfo.getNodes()) {
+          System.out.println("shard:" + node);
         }
       }
 
-      Set<String> keys = shardInfoMap.keySet();
-
-      assertTrue(keys.size() == 2);
-
-      assertTrue(keys.contains(SHARD1));
-      assertTrue(keys.contains(SHARD2));
-
-      ShardInfoList shardInfoList = shardInfoMap.get(SHARD1);
-
-      assertEquals(3, shardInfoList.getShards().size());
-
-      shardInfoList = shardInfoMap.get(SHARD2);
-
-      assertEquals(1, shardInfoList.getShards().size());
-
-      assertEquals(URL1, shardInfoList.getShards().get(0).getUrl());
+      // nocommit : check properties 
+      
     } finally {
       if (zkClient != null) {
         zkClient.close();



Mime
View raw message