lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r1210505 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/processor/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/sol...
Date Mon, 05 Dec 2011 15:47:46 GMT
Author: markrmiller
Date: Mon Dec  5 15:47:46 2011
New Revision: 1210505

URL: http://svn.apache.org/viewvc?rev=1210505&view=rev
Log:
dump my latest

Added:
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicFullDistributedZkTest.java
  (with props)
Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java?rev=1210505&r1=1210504&r2=1210505&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
Mon Dec  5 15:47:46 2011
@@ -25,9 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1210505&r1=1210504&r2=1210505&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon
Dec  5 15:47:46 2011
@@ -73,10 +73,10 @@ public class Overseer implements NodeSta
       createZkNodes(zkClient);
       createClusterStateWatchersAndUpdate();
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
+      // nocommit
       e.printStackTrace();
     } catch (KeeperException e) {
-      // TODO Auto-generated catch block
+      // nocommit
       e.printStackTrace();
     } catch (IOException e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"",e);
@@ -167,12 +167,13 @@ public class Overseer implements NodeSta
           @Override
           public void process(WatchedEvent event) {
             try {
-              List<String> liveNodes = zkClient.getChildren(
-                  ZkStateReader.LIVE_NODES_ZKNODE, this);
-              Set<String> liveNodesSet = new HashSet<String>();
-              liveNodesSet.addAll(liveNodes);
-              processLiveNodesChanged(cloudState.getLiveNodes(), liveNodes);
               synchronized (cloudState) {
+                List<String> liveNodes = zkClient.getChildren(
+                    ZkStateReader.LIVE_NODES_ZKNODE, this);
+                Set<String> liveNodesSet = new HashSet<String>();
+                liveNodesSet.addAll(liveNodes);
+                processLiveNodesChanged(cloudState.getLiveNodes(), liveNodes);
+                
                 cloudState = new CloudState(liveNodesSet, cloudState
                     .getCollectionStates());
                 // TODO: why are we syncing on cloudState and not the update
@@ -322,6 +323,7 @@ public class Overseer implements NodeSta
    */
   private void publishCloudState() {
     try {
+      System.out.println("publish:" + cloudState.getCollections());
       CloudStateUtility.update(zkClient, cloudState, null);
     } catch (KeeperException e) {
       log.error("Could not publish cloud state.", e);
@@ -424,7 +426,7 @@ public class Overseer implements NodeSta
     try {
       zkClient.setData("/node_assignments/" + node, content);
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
+      // nocommit
       e.printStackTrace();
     }
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1210505&r1=1210504&r2=1210505&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
Mon Dec  5 15:47:46 2011
@@ -547,6 +547,14 @@ public final class ZkController {
         core.close();
       }
     }
+    
+    // nocommit: GRRRR - sometimes nodes are hit before they know about cluster
+    // state - this doesnt help much though...
+    while (!zkStateReader.getCloudState().getCollections().contains(collection)) {
+      Thread.sleep(500);
+    }
+    // make sure we have an update cluster state right away
+    zkStateReader.updateCloudState(true);
 
     return shardId;
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1210505&r1=1210504&r2=1210505&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
Mon Dec  5 15:47:46 2011
@@ -125,6 +125,7 @@ public class SolrCmdDistributor {
     clone.solrDoc = cmd.solrDoc;
     clone.commitWithin = cmd.commitWithin;
     clone.overwrite = cmd.overwrite;
+    clone.setVersion(cmd.getVersion());
     
     // nocommit: review as far as SOLR-2685
     // clone.indexedId = cmd.indexedId;
@@ -254,7 +255,10 @@ public class SolrCmdDistributor {
   
   private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) {
     DeleteUpdateCommand c = (DeleteUpdateCommand)cmd.clone();
-    cmd.setReq(req);
+    // TODO: shouldnt the clone do this?
+    c.setReq(req);
+    c.setFlags(cmd.getFlags());
+    c.setVersion(cmd.getVersion());
     return c;
   }
   

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1210505&r1=1210504&r2=1210505&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
Mon Dec  5 15:47:46 2011
@@ -201,7 +201,7 @@ public class DistributedUpdateProcessor 
     
     if (slices == null) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find collection "
-          + collection + " in " + cloudState.getCollections());
+          + collection + " in " + cloudState);
     }
     
     Set<String> shards = slices.keySet();
@@ -600,7 +600,7 @@ public class DistributedUpdateProcessor 
    
     Map<String,Slice> slices = cloudState.getSlices(collection);
     if (slices == null) {
-      throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find collection in zk:
" + collection);
+      throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find collection in zk:
" + cloudState);
     }
     
     Slice replicas = slices.get(shardId);

Added: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicFullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicFullDistributedZkTest.java?rev=1210505&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicFullDistributedZkTest.java
(added)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicFullDistributedZkTest.java
Mon Dec  5 15:47:46 2011
@@ -0,0 +1,101 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.junit.BeforeClass;
+
+/**
+ * Super basic testing, no shard restarting or anything.
+ */
+public class BasicFullDistributedZkTest extends FullDistributedZkTest {
+  
+  
+  @BeforeClass
+  public static void beforeSuperClass() throws Exception {
+    
+  }
+  
+  public BasicFullDistributedZkTest() {
+    super();
+    shardCount = 4;
+    sliceCount = 2;
+  }
+  
+  @Override
+  public void doTest() throws Exception {
+    // GRRRRR - this is needed because it takes a while for all the shards to learn about
the cluster state
+    Thread.sleep(5000);
+    
+    handle.clear();
+    handle.put("QTime", SKIPVAL);
+    handle.put("timestamp", SKIPVAL);
+    
+    del("*:*");
+    
+    // add a doc, update it, and delete it
+    
+    String docId = "99999999";
+    indexr("id", docId, t1, "originalcontent");
+    
+    commit();
+    
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.add("distrib", "true");
+    params.add("q", t1 + ":originalcontent");
+    QueryResponse results = clients.get(0).query(params);
+    assertEquals(1, results.getResults().getNumFound());
+    System.out.println("results:" + results);
+    
+    // update doc
+    indexr("id", docId, t1, "updatedcontent");
+    
+    commit();
+    
+    assertDocCounts();
+    
+    results = clients.get(0).query(params);
+    System.out.println("results1:" + results.getResults());
+    assertEquals(0, results.getResults().getNumFound());
+    
+    params.set("q", t1 + ":updatedcontent");
+    
+    results = clients.get(0).query(params);
+    assertEquals(1, results.getResults().getNumFound());
+    
+    UpdateRequest uReq = new UpdateRequest();
+    uReq.setParam(UpdateParams.UPDATE_CHAIN, DISTRIB_UPDATE_CHAIN);
+    uReq.deleteById(docId).process(clients.get(0));
+    
+    commit();
+    
+    System.out.println("results2:" + results.getResults());
+    
+    results = clients.get(0).query(params);
+    assertEquals(0, results.getResults().getNumFound());
+  }
+  
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+}

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1210505&r1=1210504&r2=1210505&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
Mon Dec  5 15:47:46 2011
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -36,6 +38,7 @@ import org.apache.solr.client.solrj.impl
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.RecoveryStrat.RecoveryListener;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.CloudState;
 import org.apache.solr.common.cloud.Slice;
@@ -43,16 +46,21 @@ import org.apache.solr.common.cloud.ZkNo
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.servlet.SolrDispatchFilter;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 
 /**
  *
  */
+@Ignore("Trying to figure out an issue")
 public class FullDistributedZkTest extends AbstractDistributedZkTestCase {
   
+  static final String DISTRIB_UPDATE_CHAIN = "distrib-update-chain";
+
   private static final String DEFAULT_COLLECTION = "collection1";
 
   String t1="a_t";
@@ -71,7 +79,7 @@ public class FullDistributedZkTest exten
   String oddField="oddField_s";
   String missingField="ignore_exception__missing_but_valid_field_t";
   String invalidField="ignore_exception__invalid_field_not_in_schema";
-  protected int sliceCount = 4;
+  protected int sliceCount;
   
   protected volatile CloudSolrServer cloudClient;
   
@@ -181,7 +189,8 @@ public class FullDistributedZkTest exten
   public FullDistributedZkTest() {
     fixShardCount = true;
     
-    shardCount = 12;
+    shardCount = 6;
+    sliceCount = 3;
     // TODO: for now, turn off stress because it uses regular clients, and we 
     // need the cloud client because we kill servers
     stress = 0;
@@ -191,10 +200,15 @@ public class FullDistributedZkTest exten
   
   protected void initCloud() throws Exception {
     if (zkStateReader == null) {
-      zkStateReader = new ZkStateReader(zkServer.getZkAddress(), 10000,
-          AbstractZkTestCase.TIMEOUT);
-
-      zkStateReader.createClusterStateWatchersAndUpdate();
+      synchronized (this) {
+        if (zkStateReader != null) {
+          return;
+        }
+        zkStateReader = new ZkStateReader(zkServer.getZkAddress(), 10000,
+            AbstractZkTestCase.TIMEOUT);
+        
+        zkStateReader.createClusterStateWatchersAndUpdate();
+      }
     }
     
     // wait until shards have started registering...
@@ -208,6 +222,9 @@ public class FullDistributedZkTest exten
     // use the distributed solrj client
     if (cloudClient == null) {
       synchronized(this) {
+        if (cloudClient != null) {
+          return;
+        }
         try {
           CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
           server.setDefaultCollection(DEFAULT_COLLECTION);
@@ -231,7 +248,7 @@ public class FullDistributedZkTest exten
 
   }
 
-  private void createJettys(int numJettys) throws Exception,
+  private List<JettySolrRunner> createJettys(int numJettys) throws Exception,
       InterruptedException, TimeoutException, IOException, KeeperException,
       URISyntaxException {
     List<JettySolrRunner> jettys = new ArrayList<JettySolrRunner>();
@@ -259,6 +276,7 @@ public class FullDistributedZkTest exten
       sb.append("|localhost:").append(j2.getLocalPort()).append(context);
     }
     shards = sb.toString();
+    return jettys;
   }
 
   protected void updateMappingsFromZk(List<JettySolrRunner> jettys,
@@ -365,7 +383,7 @@ public class FullDistributedZkTest exten
 
     UpdateRequest ureq = new UpdateRequest();
     ureq.add(doc);
-    ureq.setParam("update.chain", "distrib-update-chain");
+    ureq.setParam(UpdateParams.UPDATE_CHAIN, DISTRIB_UPDATE_CHAIN);
     ureq.process(client);
   }
   
@@ -380,7 +398,7 @@ public class FullDistributedZkTest exten
 
     UpdateRequest ureq = new UpdateRequest();
     ureq.add(doc);
-    ureq.setParam("update.chain", "distrib-update-chain");
+    ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);
     ureq.process(client);
   }
   
@@ -392,7 +410,7 @@ public class FullDistributedZkTest exten
 
     UpdateRequest ureq = new UpdateRequest();
     ureq.add(doc);
-    ureq.setParam("update.chain", "distrib-update-chain");
+    ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);
     ureq.process(client);
     
     // add to control second in case adding to shards fails
@@ -403,7 +421,7 @@ public class FullDistributedZkTest exten
     controlClient.deleteByQuery(q);
     for (SolrServer client : clients) {
       UpdateRequest ureq = new UpdateRequest();
-      ureq.setParam("update.chain", "distrib-update-chain");
+      ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);
       ureq.deleteByQuery(q).process(client);
     }
   }// serial commit...
@@ -416,7 +434,7 @@ public class FullDistributedZkTest exten
   @Override
   public void doTest() throws Exception {
     // TODO: remove the need for this...
-    Thread.sleep(1000);
+    //Thread.sleep(1000);
     //pause for cloud state to be updated with latest...
     
     handle.clear();
@@ -593,7 +611,6 @@ public class FullDistributedZkTest exten
     query("q", "*:*", "sort", "n_tl1 desc");
     
     // TMP: try adding a doc with CloudSolrServer
-    CloudSolrServer cloudClient = new CloudSolrServer(zkServer.getZkAddress());
     cloudClient.setDefaultCollection(DEFAULT_COLLECTION);
     SolrQuery query = new SolrQuery("*:*");
     query.add("distrib", "true");
@@ -606,7 +623,7 @@ public class FullDistributedZkTest exten
 
     UpdateRequest ureq = new UpdateRequest();
     ureq.add(doc);
-    ureq.setParam("update.chain", "distrib-update-chain");
+    ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);
     ureq.process(cloudClient);
     
     commit();
@@ -615,8 +632,6 @@ public class FullDistributedZkTest exten
     
     long numFound2 = cloudClient.query(query).getResults().getNumFound();
     
-    cloudClient.close();
-    
     // lets just check that the one doc since last commit made it in...
     assertEquals(numFound1 + 1, numFound2);
     
@@ -650,32 +665,22 @@ public class FullDistributedZkTest exten
 
     deadShard.start(true);
     
-    List<SolrServer> s2c = shardToClient.get("shard2");
+    waitForRecovery(deadShard);
     
-    // we should poll until state change goes to active
-    Thread.sleep(2000);
+    List<SolrServer> s2c = shardToClient.get("shard2");
 
-    //assertDocCounts();
     // if we properly recovered, we should now have the couple missing docs that
     // came in while shard was down
     assertEquals(s2c.get(0).query(new SolrQuery("*:*")).getResults()
         .getNumFound(), s2c.get(1).query(new SolrQuery("*:*"))
         .getResults().getNumFound());
     
-    // kill the other shard3 replica
-   // JettySolrRunner deadShard3 = killShard("shard3", 0);
-    
-    // should fail
-    //query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.QUERY);
-    
     query("q", "*:*", "sort", "n_tl1 desc");
     
     // test adding another replica to a shard - it should do a recovery/replication to pick
up the index from the leader
-    createJettys(1);
+    JettySolrRunner newReplica = createJettys(1).get(0);
     
-    // TODO: poll instead
-    // wait for replication
-    Thread.sleep(6000);
+    waitForRecovery(newReplica);
     
     // new server should be part of first shard
     // how many docs are on the new shard?
@@ -693,8 +698,42 @@ public class FullDistributedZkTest exten
 
     assertDocCounts();
     
+//    String docId = "99999999";
+//    indexr("id", docId, t1, "originalcontent");
+//    
+//    commit();
+//    
+//    ModifiableSolrParams params = new ModifiableSolrParams();
+//    params.add("distrib", "true");
+//    params.add("q", t1 + ":originalcontent");
+//    QueryResponse results = clients.get(0).query(params);
+//    assertEquals(1, results.getResults().getNumFound());
+//    System.out.println("results:" + results);
+//    
+//    // update doc
+//    indexr("id", docId, t1, "updatedcontent");
+//    
+//    commit();
+//    
+//    results = clients.get(0).query(params);
+//    assertEquals(0, results.getResults().getNumFound());
+//    
+//    params.set("q", t1 + ":updatedcontent");
+//    
+//    results = clients.get(0).query(params);
+//    assertEquals(1, results.getResults().getNumFound());
+//    
+//    UpdateRequest uReq = new UpdateRequest();
+//    uReq.setParam(UpdateParams.UPDATE_CHAIN, DISTRIB_UPDATE_CHAIN);
+//    uReq.deleteById(docId).process(clients.get(0));
+//    
+//    commit();
+//    
+//    results = clients.get(0).query(params);
+//    assertEquals(0, results.getResults().getNumFound());
+    
     // expire a session...
-    CloudJettyRunner cloudJetty = shardToJetty.get("shard1").get(0);
+    //CloudJettyRunner cloudJetty = shardToJetty.get("shard1").get(0);
     //chaosMonkey.expireSession(cloudJetty);
     
     // should cause another recovery...
@@ -710,12 +749,21 @@ public class FullDistributedZkTest exten
     long controlCount = controlClient.query(new SolrQuery("*:*")).getResults().getNumFound();
 
     // do some really inefficient mapping...
-    ZkStateReader zk = new ZkStateReader(zkServer.getZkAddress(), 10000, AbstractZkTestCase.TIMEOUT);
-    zk.createClusterStateWatchersAndUpdate();
-  //  Map<SolrServer,ZkNodeProps> clientToInfo = new HashMap<SolrServer,ZkNodeProps>();
-    Map<String,Slice> slices = zk.getCloudState().getSlices(DEFAULT_COLLECTION);
- 
-    zk.close();
+    ZkStateReader zk = new ZkStateReader(zkServer.getZkAddress(), 10000,
+        AbstractZkTestCase.TIMEOUT);
+    Map<String,Slice> slices = null;
+    CloudState cloudState;
+    try {
+      zk.createClusterStateWatchersAndUpdate();
+      cloudState = zk.getCloudState();
+      slices = cloudState.getSlices(DEFAULT_COLLECTION);
+    } finally {
+      zk.close();
+    }
+    
+    if (slices == null) {
+      throw new RuntimeException("Could not find collection " + DEFAULT_COLLECTION + " in
" + cloudState.getCollections());
+    }
 
     for (SolrServer client : clients) {
       for (Map.Entry<String,Slice> slice : slices.entrySet()) {
@@ -742,6 +790,32 @@ public class FullDistributedZkTest exten
     query.add("distrib", "true");
     assertEquals("Doc Counts do not add up", controlCount, cloudClient.query(query).getResults().getNumFound());
   }
+  
+  protected void waitForRecovery(JettySolrRunner replica)
+      throws InterruptedException {
+    final CountDownLatch recoveryLatch = new CountDownLatch(1);
+    RecoveryStrat recoveryStrat = ((SolrDispatchFilter) replica.getDispatchFilter().getFilter()).getCores()
+        .getZkController().getRecoveryStrat();
+    
+    recoveryStrat.setRecoveryListener(new RecoveryListener() {
+      
+      @Override
+      public void startRecovery() {}
+      
+      @Override
+      public void finishedReplication() {}
+      
+      @Override
+      public void finishedRecovery() {
+        recoveryLatch.countDown();
+      }
+    });
+    
+    // wait for recovery to finish
+    // if it takes over n seconds, assume we didnt get our listener attached before
+    // recover started - it should be done before n though
+    recoveryLatch.await(30, TimeUnit.SECONDS);
+  }
 
   @Override
   protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException
{  

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java?rev=1210505&r1=1210504&r2=1210505&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
Mon Dec  5 15:47:46 2011
@@ -30,14 +30,15 @@ import org.apache.solr.common.SolrInputD
 import org.apache.solr.servlet.SolrDispatchFilter;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  *
  */
 public class RecoveryZkTest extends FullDistributedZkTest {
 
-  
+  private static Logger log = LoggerFactory.getLogger(RecoveryZkTest.class);
   @BeforeClass
   public static void beforeSuperClass() throws Exception {
     System.setProperty("mockdir.checkindex", "false");
@@ -56,8 +57,8 @@ public class RecoveryZkTest extends Full
   
   @Override
   public void doTest() throws Exception {
-    // nocommit: remove the need for this
-    Thread.sleep(5000);
+    // GRRRRR - this is needed because it takes a while for all the shards to learn about
the cluster state
+    //Thread.sleep(5000);
     
     handle.clear();
     handle.put("QTime", SKIPVAL);
@@ -114,34 +115,13 @@ public class RecoveryZkTest extends Full
     // wait a moment - lets allow some docs to be indexed so replication time is non 0
     Thread.sleep(4000);
 
-    final CountDownLatch recoveryLatch = new CountDownLatch(1);
+    
 
     
     // bring shard replica up
     replica.start();
     
-    RecoveryStrat recoveryStrat = ((SolrDispatchFilter) replica.getDispatchFilter().getFilter()).getCores()
-        .getZkController().getRecoveryStrat();
-    
-    recoveryStrat.setRecoveryListener(new RecoveryListener() {
-      
-      @Override
-      public void startRecovery() {}
-      
-      @Override
-      public void finishedReplication() {}
-      
-      @Override
-      public void finishedRecovery() {
-        recoveryLatch.countDown();
-      }
-    });
-    
-    
-    // wait for recovery to finish
-    // if it takes over n seconds, assume we didnt get our listener attached before
-    // recover started - it should be done before n though
-    recoveryLatch.await(30, TimeUnit.SECONDS);
+    waitForRecovery(replica);
     
     // stop indexing threads
     indexThread.safeStop();

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1210505&r1=1210504&r2=1210505&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
(original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
Mon Dec  5 15:47:46 2011
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServer;
@@ -48,6 +49,7 @@ public class CloudSolrServer extends Sol
   private String defaultCollection;
   private LBHttpSolrServer lbServer;
   Random rand = new Random();
+  static AtomicInteger cnt = new AtomicInteger(0);
 
   /**
    * @param zkHost The address of the zookeeper quorum containing the cloud state
@@ -60,6 +62,7 @@ public class CloudSolrServer extends Sol
    * @param zkHost The address of the zookeeper quorum containing the cloud state
    */
   public CloudSolrServer(String zkHost, LBHttpSolrServer lbServer) {
+    System.out.println("new cloud server");
     this.zkHost = zkHost;
     this.lbServer = lbServer;
   }
@@ -88,24 +91,33 @@ public class CloudSolrServer extends Sol
    * @throws InterruptedException
    */
   public void connect() {
-    if (zkStateReader != null) return;
-    synchronized(this) {
-      if (zkStateReader != null) return;
-      try {
-        ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout, zkClientTimeout);
-        zk.createClusterStateWatchersAndUpdate();
-        zkStateReader = zk;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-      } catch (KeeperException e) {
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-
-      } catch (IOException e) {
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-
-      } catch (TimeoutException e) {
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+    if (zkStateReader == null) {
+      synchronized (this) {
+        if (zkStateReader == null) {
+          try {
+            if (cnt.incrementAndGet() > 1) {
+              throw new IllegalStateException();
+            }
+            System.out.println("SHOULD ONLY HAPPEN ONCE");
+            ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout,
+                zkClientTimeout, true);
+            zk.createClusterStateWatchersAndUpdate();
+            zkStateReader = zk;
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (KeeperException e) {
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (IOException e) {
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (TimeoutException e) {
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          }
+        }
       }
     }
   }
@@ -125,7 +137,9 @@ public class CloudSolrServer extends Sol
 
     Map<String,Slice> slices = cloudState.getSlices(collection);
     
-    // nocommit: if slices is null, the collection cannot be found
+    if (slices == null) {
+      throw new RuntimeException("Could not find collection in zk: " + cloudState);
+    }
     
     Set<String> liveNodes = cloudState.getLiveNodes();
 

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java?rev=1210505&r1=1210504&r2=1210505&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
(original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
Mon Dec  5 15:47:46 2011
@@ -35,19 +35,19 @@ import org.slf4j.LoggerFactory;
 // quasi immutable :(
 public class CloudState {
 	protected static Logger log = LoggerFactory.getLogger(CloudState.class);
-	private final Map<String, Map<String, Slice>> collectionStates;
+	private final Map<String, Map<String,Slice>> collectionStates;
 	private final Set<String> liveNodes;
 
 	public CloudState() {
 		this.liveNodes = new HashSet<String>();
-		this.collectionStates = new HashMap<String, Map<String, Slice>>(0);
+		this.collectionStates = new HashMap<String,Map<String,Slice>>(0);
 	}
 
 	public CloudState(Set<String> liveNodes,
-			Map<String, Map<String, Slice>> collectionStates) {
+			Map<String, Map<String,Slice>> collectionStates) {
 		this.liveNodes = new HashSet<String>(liveNodes.size());
 		this.liveNodes.addAll(liveNodes);
-		this.collectionStates = new HashMap<String, Map<String, Slice>>(collectionStates.size());
+		this.collectionStates = new HashMap<String, Map<String,Slice>>(collectionStates.size());
 		this.collectionStates.putAll(collectionStates);
 	}
 
@@ -62,12 +62,12 @@ public class CloudState {
 	public void addSlice(String collection, Slice slice) {
 		if (!collectionStates.containsKey(collection)) {
 			log.info("New collection");
-			collectionStates.put(collection, new HashMap<String, Slice>());
+			collectionStates.put(collection, new HashMap<String,Slice>());
 		}
 		if (!collectionStates.get(collection).containsKey(slice.getName())) {
 			collectionStates.get(collection).put(slice.getName(), slice);
 		} else {
-			Map<String, ZkNodeProps> shards = new HashMap<String, ZkNodeProps>();
+			Map<String,ZkNodeProps> shards = new HashMap<String,ZkNodeProps>();
 			
 			Slice existingSlice = collectionStates.get(collection).get(slice.getName());
 			shards.putAll(existingSlice.getShards());
@@ -119,7 +119,7 @@ public class CloudState {
 	
 	public static CloudState load(byte[] bytes, Set<String> liveNodes) throws KeeperException,
InterruptedException, IOException {
     if (bytes == null || bytes.length == 0) {
-      return new CloudState(liveNodes, Collections.EMPTY_MAP);
+      return new CloudState(liveNodes, Collections.<String, Map<String,Slice>>emptyMap());
     }
     
     LinkedHashMap<String, Object> stateMap = (LinkedHashMap<String, Object>)
ObjectBuilder.fromJSON(new String(bytes, "utf-8"));

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1210505&r1=1210504&r2=1210505&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
(original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
Mon Dec  5 15:47:46 2011
@@ -58,8 +58,6 @@ public class ZkStateReader {
   public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
 
 
-
-
   private static class ZKTF implements ThreadFactory {
     private static ThreadGroup tg = new ThreadGroup("ZkStateReader");
     @Override
@@ -76,12 +74,20 @@ public class ZkStateReader {
   private SolrZkClient zkClient;
   
   private boolean closeClient = false;
+
+  private boolean debugLog;
   
   public ZkStateReader(SolrZkClient zkClient) {
     this.zkClient = zkClient;
   }
   
   public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout)
throws InterruptedException, TimeoutException, IOException {
+    this(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, false);
+  }
+  
+  public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout,
boolean debugLog) throws InterruptedException, TimeoutException, IOException {
+    this.debugLog = debugLog;
+    if (debugLog) System.out.println("NEW ZKREADER");
     closeClient = true;
     zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
         // on reconnect, reload cloud info
@@ -124,33 +130,37 @@ public class ZkStateReader {
       InterruptedException, IOException {
     // We need to fetch the current cluster state and the set of live nodes
     
-    if (!zkClient.exists(CLUSTER_STATE)) {
-      try {
-        zkClient.create(CLUSTER_STATE, null, CreateMode.PERSISTENT);
-      } catch (KeeperException e) {
-        // if someone beats us to creating this ignore it
-        if (e.code() != KeeperException.Code.NODEEXISTS) {
-          throw e;
+    synchronized (getUpdateLock()) {
+      if (!zkClient.exists(CLUSTER_STATE)) {
+        try {
+          zkClient.create(CLUSTER_STATE, null, CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+          // if someone beats us to creating this ignore it
+          if (e.code() != KeeperException.Code.NODEEXISTS) {
+            throw e;
+          }
         }
       }
     }
     
-    CloudState clusterState;
     
     log.info("Updating cluster state from ZooKeeper... ");
     zkClient.exists(CLUSTER_STATE, new Watcher() {
       
       @Override
       public void process(WatchedEvent event) {
+        if (debugLog) System.out.println("cluster change triggered");
         log.info("A cluster state change has occurred");
         try {
-          // remake watch
-          byte[] data = zkClient.getData(CLUSTER_STATE, this, null);
+          
           // delayed approach
           // ZkStateReader.this.updateCloudState(false, false);
           synchronized (ZkStateReader.this.getUpdateLock()) {
-            CloudState clusterState = CloudState.load(data, ZkStateReader.this.cloudState
-                .getLiveNodes());
+            // remake watch
+            byte[] data = zkClient.getData(CLUSTER_STATE, this, null);
+            CloudState clusterState = CloudState.load(data,
+                ZkStateReader.this.cloudState.getLiveNodes());
+            if (debugLog) System.out.println("update cluster:" + clusterState.getCollections());
             // update volatile
             cloudState = clusterState;
           }
@@ -168,11 +178,7 @@ public class ZkStateReader {
           Thread.currentThread().interrupt();
           log.warn("", e);
           return;
-        } /*
-           * catch(IOException e){ log.error("", e); throw new
-           * ZooKeeperException( SolrException.ErrorCode.SERVER_ERROR, "", e); }
-           */ 
-        catch (IOException e) {
+        } catch (IOException e) {
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
               "Could not serialize cloud state", e);
         } 
@@ -180,47 +186,49 @@ public class ZkStateReader {
       
     });
     
-    
-    List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
-        new Watcher() {
-          
-          @Override
-          public void process(WatchedEvent event) {
-            log.info("Updating live nodes");
-            try {
-              // delayed approach
-              // ZkStateReader.this.updateCloudState(false, true);
-              synchronized (ZkStateReader.this.getUpdateLock()) {
-                List<String> liveNodes = zkClient.getChildren(
-                    LIVE_NODES_ZKNODE, this);
-                Set<String> liveNodesSet = new HashSet<String>();
-                liveNodesSet.addAll(liveNodes);
-                CloudState clusterState = new CloudState(liveNodesSet, cloudState.getCollectionStates());
-                // update volatile
-                cloudState = clusterState;
-              }
-            } catch (KeeperException e) {
-              if (e.code() == KeeperException.Code.SESSIONEXPIRED
-                  || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-                log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+    synchronized (ZkStateReader.this.getUpdateLock()) {
+      List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
+          new Watcher() {
+            
+            @Override
+            public void process(WatchedEvent event) {
+              log.info("Updating live nodes");
+              try {
+                // delayed approach
+                // ZkStateReader.this.updateCloudState(false, true);
+                synchronized (ZkStateReader.this.getUpdateLock()) {
+                  List<String> liveNodes = zkClient.getChildren(
+                      LIVE_NODES_ZKNODE, this);
+                  Set<String> liveNodesSet = new HashSet<String>();
+                  liveNodesSet.addAll(liveNodes);
+                  CloudState clusterState = new CloudState(liveNodesSet,
+                      ZkStateReader.this.cloudState.getCollectionStates());
+                  ZkStateReader.this.cloudState = clusterState;
+                }
+              } catch (KeeperException e) {
+                if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                    || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+                  log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+                  return;
+                }
+                log.error("", e);
+                throw new ZooKeeperException(
+                    SolrException.ErrorCode.SERVER_ERROR, "", e);
+              } catch (InterruptedException e) {
+                // Restore the interrupted status
+                Thread.currentThread().interrupt();
+                log.warn("", e);
                 return;
               }
-              log.error("", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
-            } catch (InterruptedException e) {
-              // Restore the interrupted status
-              Thread.currentThread().interrupt();
-              log.warn("", e);
-              return;
             }
-          }
-          
-        });
-    Set<String> liveNodeSet = new HashSet<String>();
-    liveNodeSet.addAll(liveNodes);
-    clusterState = CloudState.load(zkClient, liveNodeSet);
-    this.cloudState = clusterState;
+            
+          });
+      Set<String> liveNodeSet = new HashSet<String>();
+      liveNodeSet.addAll(liveNodes);
+      CloudState clusterState = CloudState.load(zkClient, liveNodeSet);
+      if (debugLog) System.out.println("make cluster:" + clusterState.getCollections());
+      this.cloudState = clusterState;
+    }
   }
   
   
@@ -233,19 +241,22 @@ public class ZkStateReader {
     
     if (immediate) {
       CloudState clusterState;
+      synchronized (getUpdateLock()) {
       List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, null);
       Set<String> liveNodesSet = new HashSet<String>();
       liveNodesSet.addAll(liveNodes);
-      if (!onlyLiveNodes) {
-        log.info("Updating cloud state from ZooKeeper... ");
-
-        clusterState = CloudState.load(zkClient, liveNodesSet);
-      } else {
-        log.info("Updating live nodes from ZooKeeper... ");
-        clusterState = cloudState;
+      
+        if (!onlyLiveNodes) {
+          log.info("Updating cloud state from ZooKeeper... ");
+          
+          clusterState = CloudState.load(zkClient, liveNodesSet);
+        } else {
+          log.info("Updating live nodes from ZooKeeper... ");
+          clusterState = new CloudState(liveNodesSet,
+              ZkStateReader.this.cloudState.getCollectionStates());
+        }
       }
- 
-      // update volatile
+
       this.cloudState = clusterState;
     } else {
       if (cloudStateUpdateScheduled) {
@@ -276,7 +287,6 @@ public class ZkStateReader {
                 clusterState = new CloudState(liveNodesSet, ZkStateReader.this.cloudState.getCollectionStates());
               }
               
-              // update volatile
               ZkStateReader.this.cloudState = clusterState;
               
             } catch (KeeperException e) {



Mime
View raw message