Return-Path: Delivered-To: apmail-lucene-solr-commits-archive@minotaur.apache.org Received: (qmail 7264 invoked from network); 13 Jan 2010 22:29:58 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 13 Jan 2010 22:29:57 -0000 Received: (qmail 72706 invoked by uid 500); 13 Jan 2010 22:29:57 -0000 Delivered-To: apmail-lucene-solr-commits-archive@lucene.apache.org Received: (qmail 72667 invoked by uid 500); 13 Jan 2010 22:29:57 -0000 Mailing-List: contact solr-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: solr-dev@lucene.apache.org Delivered-To: mailing list solr-commits@lucene.apache.org Received: (qmail 72658 invoked by uid 99); 13 Jan 2010 22:29:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jan 2010 22:29:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jan 2010 22:29:47 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8A35B23889ED; Wed, 13 Jan 2010 22:29:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r898977 - in /lucene/solr/branches/cloud/src: java/org/apache/solr/cloud/ test/org/apache/solr/cloud/ Date: Wed, 13 Jan 2010 22:29:25 -0000 To: solr-commits@lucene.apache.org From: markrmiller@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100113222925.8A35B23889ED@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: markrmiller Date: Wed Jan 13 22:29:24 2010 New Revision: 898977 URL: http://svn.apache.org/viewvc?rev=898977&view=rev Log: start some changes - core/shard zknodes now persistent, ephemeral zknodes done at the node (webapp) level, begin refactor of zk state Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java?rev=898977&r1=898976&r2=898977&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java (original) +++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java Wed Jan 13 22:29:24 2010 @@ -24,11 +24,11 @@ private Map collectionInfos = new HashMap(); //nocommit - public synchronized void addCollectionInfo(String collection, CollectionInfo collectionInfo) { + public void addCollectionInfo(String collection, CollectionInfo collectionInfo) { collectionInfos.put(collection, collectionInfo); } - public synchronized CollectionInfo getCollectionInfo(String collection) { + public CollectionInfo getCollectionInfo(String collection) { return collectionInfos.get(collection); } } Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java?rev=898977&r1=898976&r2=898977&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java (original) +++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java Wed Jan 13 22:29:24 2010 @@ -44,22 +44,27 @@ static final String ROLE_PROP = "role"; // maps shard name to the shard addresses and roles - private final Map shardNameToShardInfoList; + private final Map shards; private final long updateTime; - public CollectionInfo(Map shardNameToShardInfoList) { + private final List nodes; + + + + public CollectionInfo(Map shards, List nodes) { //nocommit: defensive copy? - this.shardNameToShardInfoList = shardNameToShardInfoList; + this.shards = shards; this.updateTime = System.currentTimeMillis(); + this.nodes = nodes; } - public CollectionInfo(SolrZkClient client, String path) throws KeeperException, InterruptedException, IOException { - //nocommit: - // build immutable CollectionInfo - shardNameToShardInfoList = readShardInfo(client, path); - - this.updateTime = System.currentTimeMillis(); - } +// public CollectionInfo(SolrZkClient client, String path) throws KeeperException, InterruptedException, IOException { +// //nocommit: +// // build immutable CollectionInfo +// shardNameToShardInfoList = readShardInfo(client, path); +// nodes = client.getChildren(path, null); +// this.updateTime = System.currentTimeMillis(); +// } /** * Read info on the available Shards and Nodes. @@ -80,7 +85,7 @@ throw new IllegalStateException("Cannot find zk shards node that should exist:" + path); } - List nodes = zkClient.getChildren(path, null); + for (String zkNodeName : nodes) { byte[] data = zkClient.getData(path + "/" + zkNodeName, null, @@ -120,18 +125,17 @@ * * @return */ - public List getSearchShards() { - List nodeList = new ArrayList(); - for (ShardInfoList nodes : shardNameToShardInfoList.values()) { - nodeList.add(nodes.getShardUrl()); - } - return nodeList; - } - - public ShardInfoList getShardInfoList(String shardName) { - return shardNameToShardInfoList.get(shardName); - } +// public List getSearchShards() { +// List nodeList = new ArrayList(); +// for (ShardInfoList nodes : shardNameToShardInfoList.values()) { +// nodeList.add(nodes.getShardUrl()); +// } +// return nodeList; +// } + public List getNodes() { + return Collections.unmodifiableList(nodes); + } /** * @return last time info was updated. Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java?rev=898977&r1=898976&r2=898977&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java (original) +++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java Wed Jan 13 22:29:24 2010 @@ -101,32 +101,10 @@ log.info("Connected:" + connected); } else if (state == KeeperState.Disconnected) { - // nocommit : not sure we have to reconnect like this on disconnect - if(connected == false) { - // nocommit - System.out.println("we already know we are dc'd - why are we notified twice?"); - return; - } + // nocommit : not sure we have to reconnect on disconnect + // ZooKeeper will recover when it can connected = false; - // nocommit: start reconnect attempts - problem if this is shutdown related? - try { - connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate() { - @Override - public void update(ZooKeeper keeper) throws InterruptedException, TimeoutException, IOException { - waitForConnected(SolrZkClient.CONNECT_TIMEOUT); - client.updateKeeper(keeper); - if(onReconnect != null) { - onReconnect.command(); - } - ConnectionManager.this.connected = true; - } - }); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } else { connected = false; } Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java?rev=898977&r1=898976&r2=898977&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java (original) +++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java Wed Jan 13 22:29:24 2010 @@ -42,10 +42,15 @@ public void process(WatchedEvent event) { // nocommit : this will be called too often as shards register themselves? System.out.println("shard node changed"); + try { + // nocommit: + controller.printLayoutToStdOut(); // nocommit : refresh watcher - // controller.getKeeperConnection().exists(event.getPath(), this); + + // nocommit : rewatch + controller.getZkClient().exists(event.getPath(), this); // TODO: need to load whole state? controller.readCloudInfo(); Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java?rev=898977&r1=898976&r2=898977&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java (original) +++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java Wed Jan 13 22:29:24 2010 @@ -240,6 +240,11 @@ InterruptedException { makePath(path, null, CreateMode.PERSISTENT); } + + public void makePath(String path, CreateMode createMode) throws KeeperException, + InterruptedException { + makePath(path, null, createMode); + } /** * Creates the path in ZooKeeper, creating each node as necessary. Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=898977&r1=898976&r2=898977&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original) +++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Wed Jan 13 22:29:24 2010 @@ -26,9 +26,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -89,6 +91,8 @@ private String localHostName; private String localHost; + private String hostName; + /** * * @param zkServerAddress ZooKeeper server host address @@ -142,7 +146,7 @@ * nocommit: adds nodes if they don't exist, eg /shards/ node. consider race * conditions */ - private void addZkShardsNode(String collection) throws IOException { + private void addZkCoresNode(String collection) throws IOException { String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE; try { @@ -309,24 +313,24 @@ } // nocommit - testing - public String getSearchNodes(String collection) { - StringBuilder nodeString = new StringBuilder(); - boolean first = true; - List nodes; - - nodes = cloudInfo.getCollectionInfo(collection).getSearchShards(); - // nocommit - System.out.println("there are " + nodes.size() + " node(s)"); - for (String node : nodes) { - nodeString.append(node); - if (first) { - first = false; - } else { - nodeString.append(','); - } - } - return nodeString.toString(); - } +// public String getSearchNodes(String collection) { +// StringBuilder nodeString = new StringBuilder(); +// boolean first = true; +// List nodes; +// +// nodes = cloudInfo.getCollectionInfo(collection).getSearchShards(); +// // nocommit +// System.out.println("there are " + nodes.size() + " node(s)"); +// for (String node : nodes) { +// nodeString.append(node); +// if (first) { +// first = false; +// } else { +// nodeString.append(','); +// } +// } +// return nodeString.toString(); +// } SolrZkClient getZkClient() { return zkClient; @@ -344,8 +348,9 @@ try { localHostName = getHostAddress(); Matcher m = URL_POST.matcher(localHostName); + if (m.matches()) { - String hostName = m.group(1); + hostName = m.group(1); // register host zkClient.makePath(hostName); } else { @@ -354,8 +359,24 @@ + localHostName); } + // makes nodes node + try { + zkClient.makePath("/nodes"); + } catch (KeeperException e) { + // its okay if another beats us creating the node + if (e.code() != KeeperException.Code.NODEEXISTS) { + log.error("ZooKeeper Exception", e); + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "ZooKeeper Exception", e); + } + } + String nodeName = hostName + ":" + localHostPort + "_"+ localHostContext; + zkClient.makePath("/nodes/" + nodeName, CreateMode.EPHEMERAL); + // nocommit setUpCollectionsNode(); + + } catch (IOException e) { log.error("", e); @@ -385,7 +406,11 @@ for (String collection : collections) { String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE; - CollectionInfo collectionInfo = new CollectionInfo(zkClient, shardsZkPath); + + List nodes = zkClient.getChildren(shardsZkPath, null); + Map shards = readShardsInfo(collection, shardsZkPath, nodes); + + CollectionInfo collectionInfo = new CollectionInfo(shards, nodes); cloudInfo.addCollectionInfo(collection, collectionInfo); } @@ -470,21 +495,22 @@ * Read info on the available Shards and Nodes. * * @param path to the shards zkNode + * @param shardsZkPath + * @param nodeList * @return Map from shard name to a {@link ShardInfoList} * @throws InterruptedException * @throws KeeperException * @throws IOException */ - public Map readShardsNode(String path) + public Map readShardsInfo(String collection, String path, List nodes) throws KeeperException, InterruptedException, IOException { - - HashMap shardNameToShardList = new HashMap(); - - if (!zkClient.exists(path)) { - throw new IllegalStateException("Cannot find zk node that should exist:" - + path); + Set nodesSet = new HashSet(nodes.size()); + nodesSet.addAll(nodes); + if(cloudInfo != null) { + List oldNodes = cloudInfo.getCollectionInfo(collection).getNodes(); } - List nodes = zkClient.getChildren(path, null); + + Map cores = new HashMap(); for (String zkNodeName : nodes) { byte[] data = zkClient.getData(path + "/" + zkNodeName, null, null); @@ -492,30 +518,11 @@ Properties props = new Properties(); props.load(new ByteArrayInputStream(data)); - String url = (String) props.get(CollectionInfo.URL_PROP); - String shardNameList = (String) props.get(CollectionInfo.SHARD_LIST_PROP); - String[] shardsNames = shardNameList.split(","); - for (String shardName : shardsNames) { - ShardInfoList sList = shardNameToShardList.get(shardName); - List shardList; - if (sList == null) { - shardList = new ArrayList(1); - } else { - List oldShards = sList.getShards(); - shardList = new ArrayList(oldShards.size() + 1); - shardList.addAll(oldShards); - } - - ShardInfo shard = new ShardInfo(url); - shardList.add(shard); - ShardInfoList list = new ShardInfoList(shardList); - - shardNameToShardList.put(shardName, list); - } + cores.put(zkNodeName, props); } - return Collections.unmodifiableMap(shardNameToShardList); + return Collections.unmodifiableMap(cores); } /** @@ -548,7 +555,7 @@ // build layout if not exists // nocommit : consider how we watch shards on all collections - addZkShardsNode(collection); + addZkCoresNode(collection); // create node ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -565,8 +572,19 @@ props.store(baos, PROPS_DESC); - nodePath = zkClient.create(shardsZkPath + CORE_ZKPREFIX, - baos.toByteArray(), CreateMode.EPHEMERAL_SEQUENTIAL); + String nodeName = hostName + ":" + localHostPort + "_"+ localHostContext + (coreName.length() == 0 ? "" : "_" + coreName); + try { + nodePath = zkClient.create(shardsZkPath + "/" + nodeName, + baos.toByteArray(), CreateMode.PERSISTENT); + } catch (KeeperException e) { + // its okay if the node already exists + if (e.code() != KeeperException.Code.NODEEXISTS) { + throw e; + } + } + + // signal that the shards node has changed + zkClient.setData(shardsZkPath, (byte[])null); return nodePath; } @@ -627,8 +645,14 @@ public void process(WatchedEvent event) { System.out.println("Collections node event:" + event); + // nocommit : if collections node was signaled, look for new collections }}); + + collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null); + for(String collection : collections) { + zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/shards", shardWatcher); + } } private void setUpCollectionsNode() throws KeeperException, InterruptedException { Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java?rev=898977&r1=898976&r2=898977&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java (original) +++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java Wed Jan 13 22:29:24 2010 @@ -106,8 +106,6 @@ assertU(delQ("id:[100 TO 110]")); assertU(commit()); assertQ(req("id:[100 TO 110]"), "//*[@numFound='0']"); - - //nocommit - System.out.println("search nodes:" + h.getCoreContainer().getZooKeeperController().getSearchNodes("DEFAULT_CORE")); + } } Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=898977&r1=898976&r2=898977&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java (original) +++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java Wed Jan 13 22:29:24 2010 @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -82,35 +83,20 @@ zkController = new ZkController(ZOO_KEEPER_ADDRESS, TIMEOUT, "localhost", "8983", "/solr"); - Map shardInfoMap = zkController - .readShardsNode(shardsPath); - assertTrue(shardInfoMap.size() > 0); + zkController.readCloudInfo(); + CloudInfo cloudInfo = zkController.getCloudInfo(); + CollectionInfo collectionInfo = cloudInfo.getCollectionInfo("collection1"); + assertNotNull(collectionInfo); - Set> entries = shardInfoMap.entrySet(); if (DEBUG) { - for (Entry entry : entries) { - System.out.println("shard:" + entry.getKey() + " value:" - + entry.getValue().toString()); + for (String node : collectionInfo.getNodes()) { + System.out.println("shard:" + node); } } - Set keys = shardInfoMap.keySet(); - - assertTrue(keys.size() == 2); - - assertTrue(keys.contains(SHARD1)); - assertTrue(keys.contains(SHARD2)); - - ShardInfoList shardInfoList = shardInfoMap.get(SHARD1); - - assertEquals(3, shardInfoList.getShards().size()); - - shardInfoList = shardInfoMap.get(SHARD2); - - assertEquals(1, shardInfoList.getShards().size()); - - assertEquals(URL1, shardInfoList.getShards().get(0).getUrl()); + // nocommit : check properties + } finally { if (zkClient != null) { zkClient.close();