lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From no...@apache.org
Subject svn commit: r1624556 [1/2] - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/admin/ core/src/java/org/apache/solr/servlet/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/clie...
Date Fri, 12 Sep 2014 14:11:17 GMT
Author: noble
Date: Fri Sep 12 14:11:17 2014
New Revision: 1624556

URL: http://svn.apache.org/r1624556
Log:
split clusterstate.json SOLR-5473, SOLR-5474, SOLR-5810

Added:
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java
    lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1624556&r1=1624555&r2=1624556&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Fri Sep 12 14:11:17 2014
@@ -144,6 +144,13 @@ New Features
 * SOLR-5098: Schema API: Add REST support for adding field types to the schema.
   (Timothy Potter)
 
+* SOLR-5473 : Split clusterstate.json per collection and watch states selectively
+              (Noble Paul, Mark Miller, shalin, Jessica Cheng Mallet, Timothy Potter, Ashum Gupta)
+
+* SOLR-5474 : Support for SOLR-5473 in SolrJ (Timothy Potter, Noble Paul, Mark Miller)
+
+* SOLR-5810 : Support for SOLR-5473 in solr admin UI (Timothy Potter, Noble Paul)
+
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1624556&r1=1624555&r2=1624556&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Sep 12 14:11:17 2014
@@ -108,6 +108,8 @@ public class Overseer implements Closeab
     private Map clusterProps;
     private boolean isClosed = false;
 
+    private final Map<String, Object> updateNodes = new LinkedHashMap<>();
+    private boolean isClusterStateModified = false;
 
 
     public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
@@ -261,6 +263,7 @@ public class Overseer implements Closeab
                 stateUpdateQueue.poll();
 
                 if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break;
+                if(!updateNodes.isEmpty()) break;
                 // if an event comes in the next 100ms batch it together
                 head = stateUpdateQueue.peek(100);
               }
@@ -300,8 +303,30 @@ public class Overseer implements Closeab
       TimerContext timerContext = stats.time("update_state");
       boolean success = false;
       try {
-        zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true);
-        lastUpdatedTime = System.nanoTime();
+        if (!updateNodes.isEmpty()) {
+          for (Entry<String,Object> e : updateNodes.entrySet()) {
+            if (e.getValue() == null) {
+              if (zkClient.exists(e.getKey(), true)) zkClient.delete(e.getKey(), 0, true);
+            } else {
+              byte[] data = ZkStateReader.toJSON(e.getValue());
+              if (zkClient.exists(e.getKey(), true)) {
+                log.info("going to update_collection {}", e.getKey());
+                zkClient.setData(e.getKey(), data, true);
+              } else {
+                log.info("going to create_collection {}", e.getKey(), new String(data));
+                zkClient.create(e.getKey(), data, CreateMode.PERSISTENT, true);
+              }
+            }
+          }
+          updateNodes.clear();
+        }
+        
+        if (isClusterStateModified) {
+          lastUpdatedTime = System.nanoTime();
+          zkClient.setData(ZkStateReader.CLUSTER_STATE,
+              ZkStateReader.toJSON(clusterState), true);
+          isClusterStateModified = false;
+        }
         success = true;
       } finally {
         timerContext.stop();
@@ -703,7 +728,7 @@ public class Overseer implements Closeab
         }
 
         Slice slice = clusterState.getSlice(collection, sliceName);
-        
+
         Map<String,Object> replicaProps = new LinkedHashMap<>();
 
         replicaProps.putAll(message.getProperties());
@@ -721,7 +746,7 @@ public class Overseer implements Closeab
           replicaProps.remove(ZkStateReader.SHARD_ID_PROP);
           replicaProps.remove(ZkStateReader.COLLECTION_PROP);
           replicaProps.remove(QUEUE_OPERATION);
-          
+
           // remove any props with null values
           Set<Entry<String,Object>> entrySet = replicaProps.entrySet();
           List<String> removeKeys = new ArrayList<>();
@@ -869,10 +894,22 @@ public class Overseer implements Closeab
         }
         collectionProps.put(DocCollection.DOC_ROUTER, routerSpec);
 
-        if(message.getStr("fromApi") == null) collectionProps.put("autoCreated","true");
-        DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
-        return newState(state, singletonMap(newCollection.getName(), newCollection));
+      if (message.getStr("fromApi") == null) {
+        collectionProps.put("autoCreated", "true");
       }
+      
+      String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null
+          : ZkStateReader.getCollectionPath(collectionName);
+      
+      DocCollection newCollection = new DocCollection(collectionName,
+          newSlices, collectionProps, router, -1, znode);
+      
+      isClusterStateModified = true;
+      
+      log.info("state version {} {}", collectionName, newCollection.getStateFormat());
+      
+      return newState(state, singletonMap(newCollection.getName(), newCollection));
+    }
 
       /*
        * Return an already assigned id or null if not assigned
@@ -909,30 +946,28 @@ public class Overseer implements Closeab
         }
         return null;
       }
-      
+
       private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
         // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
         // System.out.println("Updating slice:" + slice);
-
+        DocCollection newCollection = null;
         DocCollection coll = state.getCollectionOrNull(collectionName) ;
         Map<String,Slice> slices;
-        Map<String,Object> props;
-        DocRouter router;
-
+        
         if (coll == null) {
           //  when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
           // without explicitly creating a collection.  In this current case, we assume custom sharding with an "implicit" router.
-          slices = new HashMap<>(1);
-          props = new HashMap<>(1);
+          slices = new LinkedHashMap<>(1);
+          slices.put(slice.getName(), slice);
+          Map<String,Object> props = new HashMap<>(1);
           props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME));
-          router = new ImplicitDocRouter();
+          newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter());
         } else {
-          props = coll.getProperties();
-          router = coll.getRouter();
           slices = new LinkedHashMap<>(coll.getSlicesMap()); // make a shallow copy
+          slices.put(slice.getName(), slice);
+          newCollection = coll.copyWithSlices(slices);
         }
-        slices.put(slice.getName(), slice);
-        DocCollection newCollection = new DocCollection(collectionName, slices, props, router);
+
 
         // System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));
 
@@ -991,28 +1026,53 @@ public class Overseer implements Closeab
         }
 
 
-        DocCollection newCollection = new DocCollection(coll.getName(), slices, coll.getProperties(), coll.getRouter());
+        DocCollection newCollection = coll.copyWithSlices(slices);
         return newState(state, singletonMap(collectionName, newCollection));
       }
 
-      private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
-        return state.copyWith(colls);
-      }
+    private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
+      for (Entry<String, DocCollection> e : colls.entrySet()) {
+        DocCollection c = e.getValue();
+        if (c == null) {
+          isClusterStateModified = true;
+          state = state.copyWith(singletonMap(e.getKey(), (DocCollection) null));
+          updateNodes.put(ZkStateReader.getCollectionPath(e.getKey()) ,null);
+          continue;
+        }
 
-      /*
-       * Remove collection from cloudstate
-       */
-      private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
-        final String collection = message.getStr("name");
-        if (!checkKeyExistence(message, "name")) return clusterState;
-        DocCollection coll = clusterState.getCollectionOrNull(collection);
-        if(coll !=null) {
-          return clusterState.copyWith(singletonMap(collection,(DocCollection)null));
+        if (c.getStateFormat() > 1) {
+          updateNodes.put(ZkStateReader.getCollectionPath(c.getName()),
+              new ClusterState(-1, Collections.<String>emptySet(), singletonMap(c.getName(), c)));
+        } else {
+          isClusterStateModified = true;
         }
-        return clusterState;
+        state = state.copyWith(singletonMap(e.getKey(), c));
+
       }
+      return state;
+    }
 
     /*
+     * Remove collection from cloudstate
+     */
+    private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
+      final String collection = message.getStr("name");
+      if (!checkKeyExistence(message, "name")) return clusterState;
+      DocCollection coll = clusterState.getCollectionOrNull(collection);
+      if(coll == null) return  clusterState;
+
+      isClusterStateModified = true;
+      if (coll.getStateFormat() > 1) {
+        try {
+          log.info("Deleting state for collection : {}", collection);
+          zkClient.delete(ZkStateReader.getCollectionPath(collection), -1, true);
+        } catch (Exception e) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to remove collection state :" + collection);
+        }
+      }
+      return newState(clusterState, singletonMap(coll.getName(),(DocCollection) null));
+    }
+    /*
      * Remove collection slice from cloudstate
      */
     private ClusterState removeShard(final ClusterState clusterState, ZkNodeProps message) {
@@ -1027,7 +1087,7 @@ public class Overseer implements Closeab
       Map<String, Slice> newSlices = new LinkedHashMap<>(coll.getSlicesMap());
       newSlices.remove(sliceId);
 
-      DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
+      DocCollection newCollection = coll.copyWithSlices(newSlices);
       return newState(clusterState, singletonMap(collection,newCollection));
     }
 
@@ -1039,8 +1099,6 @@ public class Overseer implements Closeab
         final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
         if (!checkCollectionKeyExistence(message)) return clusterState;
 
-//        final Map<String, DocCollection> newCollections = new LinkedHashMap<>(clusterState.getCollectionStates()); // shallow copy
-//        DocCollection coll = newCollections.get(collection);
         DocCollection coll = clusterState.getCollectionOrNull(collection) ;
         if (coll == null) {
           // TODO: log/error that we didn't find it?
@@ -1078,7 +1136,7 @@ public class Overseer implements Closeab
             newSlices.put(slice.getName(), slice);
           }
         }
-        
+
         if (lastSlice) {
           // remove all empty pre allocated slices
           for (Slice slice : coll.getSlices()) {
@@ -1095,7 +1153,7 @@ public class Overseer implements Closeab
           // but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
           // ZkController out of the Overseer.
           try {
-            zkClient.clean("/collections/" + collection);
+            zkClient.delete("/collections/" + collection, -1, true);
           } catch (InterruptedException e) {
             SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e);
             Thread.currentThread().interrupt();
@@ -1105,8 +1163,8 @@ public class Overseer implements Closeab
           return newState(clusterState,singletonMap(collection, (DocCollection) null));
 
         } else {
-          DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
-           return newState(clusterState,singletonMap(collection,newCollection));
+          DocCollection newCollection = coll.copyWithSlices(newSlices);
+          return newState(clusterState,singletonMap(collection,newCollection));
         }
 
      }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1624556&r1=1624555&r2=1624556&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Fri Sep 12 14:11:17 2014
@@ -698,7 +698,7 @@ public class OverseerCollectionProcessor
 
     // convert cluster state into a map of writable types
     byte[] bytes = ZkStateReader.toJSON(clusterState);
-    Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
+    Map<String, Object> stateMap = (Map<String,Object>) ZkStateReader.fromJSON(bytes);
 
     String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
     NamedList<Object> collectionProps = new SimpleOrderedMap<Object>();
@@ -706,7 +706,13 @@ public class OverseerCollectionProcessor
       Set<String> collections = clusterState.getCollections();
       for (String name : collections) {
         Map<String, Object> collectionStatus = null;
-        collectionStatus = getCollectionStatus((Map<String, Object>) stateMap.get(name), name, shard);
+        if (clusterState.getCollection(name).getStateFormat() > 1) {
+          bytes = ZkStateReader.toJSON(clusterState.getCollection(name));
+          Map<String, Object> docCollection = (Map<String,Object>) ZkStateReader.fromJSON(bytes);
+          collectionStatus = getCollectionStatus(docCollection, name, shard);
+        } else  {
+          collectionStatus = getCollectionStatus((Map<String,Object>) stateMap.get(name), name, shard);
+        }
         if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty())  {
           collectionStatus.put("aliases", collectionVsAliases.get(name));
         }
@@ -715,8 +721,12 @@ public class OverseerCollectionProcessor
     } else {
       String routeKey = message.getStr(ShardParams._ROUTE_);
       Map<String, Object> docCollection = null;
-
-      docCollection = (Map<String, Object>) stateMap.get(collection);
+      if (clusterState.getCollection(collection).getStateFormat() > 1) {
+        bytes = ZkStateReader.toJSON(clusterState.getCollection(collection));
+        docCollection = (Map<String,Object>) ZkStateReader.fromJSON(bytes);
+      } else  {
+        docCollection = (Map<String,Object>) stateMap.get(collection);
+      }
       if (routeKey == null) {
         Map<String, Object> collectionStatus = getCollectionStatus(docCollection, collection, shard);
         if (collectionVsAliases.containsKey(collection) && !collectionVsAliases.get(collection).isEmpty())  {
@@ -2409,11 +2419,15 @@ public class OverseerCollectionProcessor
 
     }
 
-    if(configName!= null){
-      log.info("creating collections conf node {} ",ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll);
-      zkStateReader.getZkClient().makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll,
-          ZkStateReader.toJSON(ZkNodeProps.makeMap(ZkController.CONFIGNAME_PROP,configName)),true );
-
+    if (configName != null) {
+      String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll;
+      log.info("creating collections conf node {} ", collDir);
+      byte[] data = ZkStateReader.toJSON(ZkNodeProps.makeMap(ZkController.CONFIGNAME_PROP, configName));
+      if (zkStateReader.getZkClient().exists(collDir, true)) {
+        zkStateReader.getZkClient().setData(collDir, data, true);
+      } else {
+        zkStateReader.getZkClient().makePath(collDir, data, true);
+      }
     } else {
       if(isLegacyCloud){
         log.warn("Could not obtain config name");

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1624556&r1=1624555&r2=1624556&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Sep 12 14:11:17 2014
@@ -1165,6 +1165,19 @@ public final class ZkController {
     }
 
     CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
+    boolean removeWatch = true;
+    // if there is no SolrCore which is a member of this collection, remove the watch
+    for (SolrCore solrCore : cc.getCores()) {
+      CloudDescriptor cloudDesc = solrCore.getCoreDescriptor()
+          .getCloudDescriptor();
+      if (cloudDesc != null
+          && cloudDescriptor.getCollectionName().equals(
+              cloudDesc.getCollectionName())) {
+        removeWatch = false;
+        break;
+      }
+    }
+    if (removeWatch) zkStateReader.removeZKWatch(collection);
     ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
         Overseer.DELETECORE, ZkStateReader.CORE_NAME_PROP, coreName,
         ZkStateReader.NODE_NAME_PROP, getNodeName(),
@@ -1466,6 +1479,11 @@ public final class ZkController {
       }
 
       publish(cd, ZkStateReader.DOWN, false, true);
+      DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName());
+      if(collection !=null && collection.getStateFormat()>1  ){
+        log.info("Registering watch for external collection {}",cd.getCloudDescriptor().getCollectionName());
+        zkStateReader.addCollectionWatch(cd.getCloudDescriptor().getCollectionName());
+      }
     } catch (KeeperException e) {
       log.error("", e);
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1624556&r1=1624555&r2=1624556&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Fri Sep 12 14:11:17 2014
@@ -24,6 +24,7 @@ import static org.apache.solr.cloud.Over
 import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.DELETEREPLICA;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
@@ -59,6 +60,7 @@ import org.apache.solr.cloud.OverseerSol
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -142,7 +144,7 @@ public class CollectionsHandler extends 
     if (action == null) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown action: " + a);
     }
-    
+
     switch (action) {
       case CREATE: {
         this.handleCreateAction(req, rsp);
@@ -468,15 +470,16 @@ public class CollectionsHandler extends 
         Overseer.QUEUE_OPERATION,
         OverseerCollectionProcessor.CREATECOLLECTION,
         "fromApi","true");
-    copyIfNotNull(req.getParams(), props,
-         "name",
-         ZkStateReader.REPLICATION_FACTOR,
+    copyIfNotNull(req.getParams(),props,
+        "name",
+        REPLICATION_FACTOR,
          COLL_CONF,
          NUM_SLICES,
          MAX_SHARDS_PER_NODE,
          CREATE_NODE_SET,
          SHARDS_PROP,
          ASYNC,
+         DocCollection.STATE_FORMAT,
          AUTO_ADD_REPLICAS,
         "router.");
 
@@ -670,4 +673,9 @@ public class CollectionsHandler extends 
   public String getDescription() {
     return "Manage SolrCloud Collections";
   }
+
+  @Override
+  public String getSource() {
+    return "$URL: https://svn.apache.org/repos/asf/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionHandler.java $";
+  }
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1624556&r1=1624555&r2=1624556&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Fri Sep 12 14:11:17 2014
@@ -318,6 +318,7 @@ public class SolrDispatchFilter extends 
             String coreUrl = getRemotCoreUrl(cores, corename, origCorename);
             // don't proxy for internal update requests
             SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString());
+            checkStateIsValid(cores, queryParams.get(CloudSolrServer.STATE_VERSION));
             if (coreUrl != null
                 && queryParams
                     .get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) {
@@ -373,6 +374,7 @@ public class SolrDispatchFilter extends 
               if( "/select".equals( path ) || "/select/".equals( path ) ) {
                 solrReq = parser.parse( core, path, req );
 
+                checkStateIsValid(cores,solrReq.getParams().get(CloudSolrServer.STATE_VERSION));
                 String qt = solrReq.getParams().get( CommonParams.QT );
                 handler = core.getRequestHandler( qt );
                 if( handler == null ) {
@@ -462,6 +464,22 @@ public class SolrDispatchFilter extends 
     chain.doFilter(request, response);
   }
 
+  private void checkStateIsValid(CoreContainer cores, String stateVer) {
+    if (stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware()) {
+      // many have multiple collections separated by |
+      String[] pairs = StringUtils.split(stateVer, '|');
+      for (String pair : pairs) {
+        String[] pcs = StringUtils.split(pair, ':');
+        if (pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()) {
+          Boolean status = cores.getZkController().getZkStateReader().checkValid(pcs[0], Integer.parseInt(pcs[1]));
+          
+          if (Boolean.TRUE != status) {
+            throw new SolrException(ErrorCode.INVALID_STATE, "STATE STALE: " + pair + "valid : " + status);
+          }
+        }
+      }
+    }
+  }
 
   private void processAliases(SolrQueryRequest solrReq, Aliases aliases,
       List<String> collectionsList) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java?rev=1624556&r1=1624555&r2=1624556&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java Fri Sep 12 14:11:17 2014
@@ -24,7 +24,6 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Date;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -35,9 +34,6 @@ import javax.servlet.http.HttpServletReq
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.lucene.util.BytesRef;
-import org.noggit.CharArr;
-import org.noggit.JSONWriter;
-import org.noggit.ObjectBuilder;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
@@ -46,14 +42,15 @@ import org.apache.solr.common.params.Sol
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.util.FastWriter;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
 import org.noggit.CharArr;
 import org.noggit.JSONWriter;
+import org.noggit.ObjectBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+
 /**
  * Zookeeper Info
  *
@@ -90,7 +87,6 @@ public final class ZookeeperInfoServlet 
 
     String path = params.get("path");
     String addr = params.get("addr");
-    boolean all = "true".equals(params.get("all"));
 
     if (addr != null && addr.length() == 0) {
       addr = null;
@@ -110,6 +106,7 @@ public final class ZookeeperInfoServlet 
     ZKPrinter printer = new ZKPrinter(response, out, cores.getZkController(), addr);
     printer.detail = detail;
     printer.dump = dump;
+    printer.isTreeView = (params.get("wt") == null); // this is hacky but tree view requests don't come in with the wt set
 
     try {
       printer.print(path);
@@ -140,6 +137,8 @@ public final class ZookeeperInfoServlet 
     boolean detail = false;
     boolean dump = false;
 
+    boolean isTreeView = false;
+
     String addr; // the address passed to us
     String keeperAddr; // the address we're connected to
 
@@ -385,6 +384,47 @@ public final class ZookeeperInfoServlet 
             dataStrErr = "data is not parsable as a utf8 String: " + e.toString();
           }
         }
+        // pull in external collections too
+        if (ZkStateReader.CLUSTER_STATE.equals(path) && !isTreeView) {
+          SortedMap<String,Object> collectionStates = null;
+          List<String> children = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, null, true);
+          java.util.Collections.sort(children);
+          for (String collection : children) {
+            String collStatePath = ZkStateReader.getCollectionPath(collection);
+            String childDataStr = null;
+            try {
+              byte[] childData = zkClient.getData(collStatePath, null, null, true);
+              if (childData != null) {
+                childDataStr = (new BytesRef(childData)).utf8ToString();
+              }
+            } catch (KeeperException.NoNodeException nne) {
+              // safe to ignore
+            } catch (Exception childErr) {
+              log.error("Failed to get "+collStatePath+" due to: "+childErr);
+            }
+
+            if (childDataStr != null) {
+              if (collectionStates == null) {
+                // initialize lazily as there may not be any external collections
+                collectionStates = new TreeMap<>();
+
+                // add the internal collections
+                if (dataStr != null)
+                  collectionStates.putAll((Map<String,Object>)ObjectBuilder.fromJSON(dataStr));
+              }
+
+              // now add in the external collections
+              Map<String,Object> extColl = (Map<String,Object>)ObjectBuilder.fromJSON(childDataStr);
+              collectionStates.put(collection, extColl.get(collection));
+            }
+          }
+
+          if (collectionStates != null) {
+            CharArr out = new CharArr();
+            new JSONWriter(out, 2).write(collectionStates);
+            dataStr = out.toString();
+          }
+        }
 
         json.writeString("znode");
         json.writeNameSeparator();

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java?rev=1624556&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java Fri Sep 12 14:11:17 2014
@@ -0,0 +1,115 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.impl.CloudSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+public class ExternalCollectionsTest extends AbstractFullDistribZkTestBase {
+  private CloudSolrServer client;
+
+  @BeforeClass
+  public static void beforeThisClass2() throws Exception {
+
+  }
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    System.setProperty("numShards", Integer.toString(sliceCount));
+    System.setProperty("solr.xml.persist", "true");
+    client = createCloudClient(null);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    client.shutdown();
+  }
+
+  protected String getSolrXml() {
+    return "solr-no-core.xml";
+  }
+
+  public ExternalCollectionsTest() {
+    fixShardCount = true;
+
+    sliceCount = 2;
+    shardCount = 4;
+
+    checkCreatedVsState = false;
+  }
+
+
+  @Override
+  public void doTest() throws Exception {
+    testZkNodeLocation();
+  }
+
+
+
+  @Override
+  protected int getStateFormat() {
+    return 2;
+  }
+
+  private void testZkNodeLocation() throws Exception{
+
+    String collectionName = "myExternColl";
+
+    createCollection(collectionName, client, 2, 2);
+
+    waitForRecoveriesToFinish(collectionName, false);
+    assertTrue("does not exist collection state externally",
+        cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
+    Stat stat = new Stat();
+    byte[] data = cloudClient.getZkStateReader().getZkClient().getData(ZkStateReader.getCollectionPath(collectionName), null, stat, true);
+    DocCollection c = ZkStateReader.getCollectionLive(cloudClient.getZkStateReader(), collectionName);
+    ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+    assertEquals("The zkversion of the nodes must be same zkver:" + stat.getVersion() , stat.getVersion(),clusterState.getCollection(collectionName).getZNodeVersion() );
+    assertTrue("DocCllection#getStateFormat() must be > 1", cloudClient.getZkStateReader().getClusterState().getCollection(collectionName).getStateFormat() > 1);
+
+
+    // remove collection
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set("action", CollectionParams.CollectionAction.DELETE.toString());
+    params.set("name", collectionName);
+    QueryRequest request = new QueryRequest(params);
+    request.setPath("/admin/collections");
+    if (client == null) {
+      client = createCloudClient(null);
+    }
+
+    client.request(request);
+
+    checkForMissingCollection(collectionName);
+    assertFalse("collection state should not exist externally", cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
+
+  }
+}
+

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1624556&r1=1624555&r2=1624556&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Fri Sep 12 14:11:17 2014
@@ -18,6 +18,8 @@ package org.apache.solr.client.solrj.imp
  */
 
 import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -29,13 +31,16 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.http.NoHttpResponseException;
 import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ConnectTimeoutException;
 import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServer;
@@ -66,6 +71,8 @@ import org.apache.solr.common.util.Named
 import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * SolrJ client class to communicate with SolrCloud.
@@ -78,6 +85,8 @@ import org.apache.zookeeper.KeeperExcept
  * with {@link #setIdField(String)}.
  */
 public class CloudSolrServer extends SolrServer {
+  private static final Logger log = LoggerFactory.getLogger(CloudSolrServer.class);
+
   private volatile ZkStateReader zkStateReader;
   private String zkHost; // the zk server address
   private int zkConnectTimeout = 10000;
@@ -86,6 +95,8 @@ public class CloudSolrServer extends Sol
   private final LBHttpSolrServer lbServer;
   private final boolean shutdownLBHttpSolrServer;
   private HttpClient myClient;
+  //no of times collection state to be reloaded if stale state error is received
+  private static final int MAX_STALE_RETRIES = 5;
   Random rand = new Random();
   
   private final boolean updatesToLeaders;
@@ -94,6 +105,7 @@ public class CloudSolrServer extends Sol
       .newCachedThreadPool(new SolrjNamedThreadFactory(
           "CloudSolrServer ThreadPool"));
   private String idField = "id";
+  public static final String STATE_VERSION = "_stateVer_";
   private final Set<String> NON_ROUTABLE_PARAMS;
   {
     NON_ROUTABLE_PARAMS = new HashSet<>();
@@ -111,6 +123,36 @@ public class CloudSolrServer extends Sol
     // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
 
   }
+  private volatile long timeToLive = 60* 1000L;
+
+
+  protected Map<String, ExpiringCachedDocCollection> collectionStateCache = new ConcurrentHashMap<String, ExpiringCachedDocCollection>(){
+    @Override
+    public ExpiringCachedDocCollection get(Object key) {
+      ExpiringCachedDocCollection val = super.get(key);
+      if(val == null) return null;
+      if(val.isExpired(timeToLive)) {
+        super.remove(key);
+        return null;
+      }
+      return val;
+    }
+
+  };
+
+  class ExpiringCachedDocCollection {
+    DocCollection cached;
+    long cachedAt;
+
+    ExpiringCachedDocCollection(DocCollection cached) {
+      this.cached = cached;
+      this.cachedAt = System.currentTimeMillis();
+    }
+
+    boolean isExpired(long timeToLive) {
+      return (System.currentTimeMillis() - cachedAt) > timeToLive;
+    }
+  }
 
   /**
    * Create a new client object that connects to Zookeeper and is always aware
@@ -133,7 +175,15 @@ public class CloudSolrServer extends Sol
    *          "zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181"
    */
   public CloudSolrServer(String zkHost) {
-      this(zkHost, true);
+      this.zkHost = zkHost;
+      this.myClient = HttpClientUtil.createClient(null);
+      this.lbServer = new LBHttpSolrServer(myClient);
+      this.lbServer.setRequestWriter(new BinaryRequestWriter());
+      this.lbServer.setParser(new BinaryResponseParser());
+      this.updatesToLeaders = true;
+      shutdownLBHttpSolrServer = true;
+      setupStateVerParamOnQueryString(lbServer);
+
   }
   
   /**
@@ -151,6 +201,15 @@ public class CloudSolrServer extends Sol
     this.lbServer.setParser(new BinaryResponseParser());
     this.updatesToLeaders = updatesToLeaders;
     shutdownLBHttpSolrServer = true;
+    setupStateVerParamOnQueryString(lbServer);
+  }
+
+  /**Sets the cache ttl for DocCollection Objects cached  . This is only applicable for collections which are persisted outside of clusterstate.json
+   * @param seconds ttl value in seconds
+   */
+  public void setCollectionCacheTTl(int seconds){
+    assert seconds > 0;
+    timeToLive = seconds*1000L;
   }
 
   /**
@@ -178,8 +237,24 @@ public class CloudSolrServer extends Sol
     this.lbServer = lbServer;
     this.updatesToLeaders = updatesToLeaders;
     shutdownLBHttpSolrServer = false;
+    setupStateVerParamOnQueryString(lbServer);
+
   }
   
+  /**
+   * Used internally to setup the _stateVer_ param to be sent in the query string of requests
+   * coming from this instance.
+   */
+  protected void setupStateVerParamOnQueryString(LBHttpSolrServer lbServer) {
+    // setup the stateVer param to be passed in the query string of every request
+    Set<String> queryStringParams = lbServer.getQueryParams();
+    if (queryStringParams == null) {
+      queryStringParams = new HashSet<String>(2);
+      lbServer.setQueryParams(queryStringParams);
+    }
+    queryStringParams.add(STATE_VERSION);
+  }
+
   public ResponseParser getParser() {
     return lbServer.getParser();
   }
@@ -317,7 +392,7 @@ public class CloudSolrServer extends Sol
       }
     }
 
-    DocCollection col = clusterState.getCollection(collection);
+    DocCollection col = getDocCollection(clusterState, collection);
 
     DocRouter router = col.getRouter();
     
@@ -534,7 +609,146 @@ public class CloudSolrServer extends Sol
   }
 
   @Override
-  public NamedList<Object> request(SolrRequest request)
+  public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
+    SolrParams reqParams = request.getParams();
+    String collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection();
+    return requestWithRetryOnStaleState(request, 0, collection);
+  }
+
+  /**
+   * As this class doesn't watch external collections on the client side,
+   * there's a chance that the request will fail due to cached stale state,
+   * which means the state must be refreshed from ZK and retried.
+   */
+  protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection)
+      throws SolrServerException, IOException {
+
+    connect(); // important to call this before you start working with the ZkStateReader
+
+    // build up a _stateVer_ param to pass to the server containing all of the
+    // external collection state versions involved in this request, which allows
+    // the server to notify us that our cached state for one or more of the external
+    // collections is stale and needs to be refreshed ... this code has no impact on internal collections
+    String stateVerParam = null;
+    List<DocCollection> requestedCollections = null;
+    if (collection != null && !request.getPath().startsWith("/admin")) { // don't do _stateVer_ checking for admin requests
+      Set<String> requestedCollectionNames = getCollectionList(getZkStateReader().getClusterState(), collection);
+
+      StringBuilder stateVerParamBuilder = null;
+      for (String requestedCollection : requestedCollectionNames) {
+        // track the version of state we're using on the client side using the _stateVer_ param
+        DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection);
+        int collVer = coll.getZNodeVersion();
+        if (coll.getStateFormat()>1) {
+          if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
+          requestedCollections.add(coll);
+
+          if (stateVerParamBuilder == null) {
+            stateVerParamBuilder = new StringBuilder();
+          } else {
+            stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
+          }
+
+          stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
+        }
+      }
+
+      if (stateVerParamBuilder != null) {
+        stateVerParam = stateVerParamBuilder.toString();
+      }
+    }
+
+    if (request.getParams() instanceof ModifiableSolrParams) {
+      ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+      if (stateVerParam != null) {
+        params.set(STATE_VERSION, stateVerParam);
+      } else {
+        params.remove(STATE_VERSION);
+      }
+    } // else: ??? how to set this ???
+
+    NamedList<Object> resp = null;
+    try {
+      resp = sendRequest(request);
+    } catch (Exception exc) {
+
+      Throwable rootCause = SolrException.getRootCause(exc);
+      // don't do retry support for admin requests or if the request doesn't have a collection specified
+      if (collection == null || request.getPath().startsWith("/admin")) {
+        if (exc instanceof SolrServerException) {
+          throw (SolrServerException)exc;
+        } else if (exc instanceof IOException) {
+          throw (IOException)exc;
+        }else if (exc instanceof RuntimeException) {
+          throw (RuntimeException) exc;
+        }
+        else {
+          throw new SolrServerException(rootCause);
+        }
+      }
+
+      int errorCode = (rootCause instanceof SolrException) ?
+          ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
+
+      log.error("Request to collection {} failed due to ("+errorCode+
+          ") {}, retry? "+retryCount, collection, rootCause.toString());
+
+      boolean wasCommError =
+          (rootCause instanceof ConnectException ||
+              rootCause instanceof ConnectTimeoutException ||
+              rootCause instanceof NoHttpResponseException ||
+              rootCause instanceof SocketException);
+
+      boolean stateWasStale = false;
+      if (retryCount < MAX_STALE_RETRIES  &&
+          !requestedCollections.isEmpty() &&
+          SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE)
+      {
+        // cached state for one or more external collections was stale
+        // re-issue request using updated state
+        stateWasStale = true;
+
+        // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
+        for (DocCollection ext : requestedCollections) {
+          collectionStateCache.remove(ext.getName());
+        }
+      }
+
+      // if we experienced a communication error, it's worth checking the state
+      // with ZK just to make sure the node we're trying to hit is still part of the collection
+      if (retryCount < MAX_STALE_RETRIES && !stateWasStale && !requestedCollections.isEmpty() && wasCommError) {
+        for (DocCollection ext : requestedCollections) {
+          DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName());
+          if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
+            // looks like we couldn't reach the server because the state was stale == retry
+            stateWasStale = true;
+            // we just pulled state from ZK, so update the cache so that the retry uses it
+            collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
+          }
+        }
+      }
+
+      requestedCollections.clear(); // done with this
+
+      // if the state was stale, then we retry the request once with new state pulled from Zk
+      if (stateWasStale) {
+        log.warn("Re-trying request to  collection(s) "+collection+" after stale state error from server.");
+        resp = requestWithRetryOnStaleState(request, retryCount+1, collection);
+      } else {
+        if (exc instanceof SolrServerException) {
+          throw (SolrServerException)exc;
+        } else if (exc instanceof IOException) {
+          throw (IOException)exc;
+        } else {
+          throw new SolrServerException(rootCause);
+        }
+      }
+    }
+
+    return resp;
+  }
+
+  protected NamedList<Object> sendRequest(SolrRequest request)
       throws SolrServerException, IOException {
     connect();
     
@@ -594,7 +808,7 @@ public class CloudSolrServer extends Sol
       // add it to the Map of slices.
       Map<String,Slice> slices = new HashMap<>();
       for (String collectionName : collectionsList) {
-        DocCollection col = clusterState.getCollection(collectionName);
+        DocCollection col = getDocCollection(clusterState, collectionName);
         Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
         ClientUtils.addSlices(slices, collectionName, routeSlices, true);
       }
@@ -654,14 +868,16 @@ public class CloudSolrServer extends Sol
         theUrlList = new ArrayList<>(urlList.size());
         theUrlList.addAll(urlList);
       }
+      if(theUrlList.isEmpty()) {
+        throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Not enough nodes to handle the request");
+      }
+
       Collections.shuffle(theUrlList, rand);
       if (sendToLeaders) {
         ArrayList<String> theReplicas = new ArrayList<>(
             replicasList.size());
         theReplicas.addAll(replicasList);
         Collections.shuffle(theReplicas, rand);
-        // System.out.println("leaders:" + theUrlList);
-        // System.out.println("replicas:" + theReplicas);
         theUrlList.addAll(theReplicas);
       }
       
@@ -690,10 +906,13 @@ public class CloudSolrServer extends Sol
           collectionsList.addAll(aliasList);
           continue;
         }
-        
-        throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
+
+        DocCollection docCollection = getDocCollection(clusterState, collection);
+        if (docCollection == null) {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
+        }
       }
-      
+
       collectionsList.add(collectionName);
     }
     return collectionsList;
@@ -730,6 +949,19 @@ public class CloudSolrServer extends Sol
     return updatesToLeaders;
   }
 
+  protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException {
+    ExpiringCachedDocCollection cachedState = collectionStateCache != null ? collectionStateCache.get(collection) : null;
+    if (cachedState != null && cachedState.cached != null) {
+      return cachedState.cached;
+    }
+
+    DocCollection col = clusterState.getCollectionOrNull(collection);
+    if(col == null ) return  null;
+    collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));
+    return col;
+  }
+
+
   /**
    * Useful for determining the minimum achieved replication factor across
    * all shards involved in processing an update request, typically useful

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java?rev=1624556&r1=1624555&r2=1624556&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java Fri Sep 12 14:11:17 2014
@@ -26,11 +26,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.noggit.JSONWriter;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
+import org.noggit.JSONWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,9 +40,9 @@ import org.slf4j.LoggerFactory;
 public class ClusterState implements JSONWriter.Writable {
   private static Logger log = LoggerFactory.getLogger(ClusterState.class);
   
-  private Integer zkClusterStateVersion;
+  private Integer znodeVersion;
   
-  private final Map<String, DocCollection> collectionStates;  // Map<collectionName, Map<sliceName,Slice>>
+  private final Map<String, CollectionRef> collectionStates;
   private Set<String> liveNodes;
 
 
@@ -59,30 +57,42 @@ public class ClusterState implements JSO
     this(null, liveNodes, collectionStates);
   }
 
-
-  
   /**
    * Use this constr when ClusterState is meant for consumption.
    */
-  public ClusterState(Integer zkClusterStateVersion, Set<String> liveNodes,
+  public ClusterState(Integer znodeVersion, Set<String> liveNodes,
       Map<String, DocCollection> collectionStates) {
-    this.zkClusterStateVersion = zkClusterStateVersion;
+    this(liveNodes, getRefMap(collectionStates),znodeVersion);
+  }
+
+  private static Map<String, CollectionRef> getRefMap(Map<String, DocCollection> collectionStates) {
+    Map<String, CollectionRef> collRefs =  new LinkedHashMap<>(collectionStates.size());
+    for (Entry<String, DocCollection> entry : collectionStates.entrySet()) {
+      final DocCollection c = entry.getValue();
+      collRefs.put(entry.getKey(), new CollectionRef(c));
+    }
+    return collRefs;
+  }
+
+  /**Use this if all the collection states are not readily available and some needs to be lazily loaded
+   */
+  public ClusterState(Set<String> liveNodes, Map<String, CollectionRef> collectionStates, Integer znodeVersion){
+    this.znodeVersion = znodeVersion;
     this.liveNodes = new HashSet<>(liveNodes.size());
     this.liveNodes.addAll(liveNodes);
-    this.collectionStates = new LinkedHashMap<>(collectionStates.size());
-    this.collectionStates.putAll(collectionStates);
-
+    this.collectionStates = new LinkedHashMap<>(collectionStates);
   }
 
+
   public ClusterState copyWith(Map<String,DocCollection> modified){
-    ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates);
+    ClusterState result = new ClusterState(liveNodes, new LinkedHashMap<>(collectionStates), znodeVersion);
     for (Entry<String, DocCollection> e : modified.entrySet()) {
       DocCollection c = e.getValue();
       if(c == null) {
         result.collectionStates.remove(e.getKey());
         continue;
       }
-      result.collectionStates.put(c.getName(), c);
+      result.collectionStates.put(c.getName(), new CollectionRef(c));
     }
     return result;
   }
@@ -117,7 +127,7 @@ public class ClusterState implements JSO
    * coreNodeName is the same as replicaName
    */
   public Replica getReplica(final String collection, final String coreNodeName) {
-    return getReplica(collectionStates.get(collection), coreNodeName);
+    return getReplica(getCollectionOrNull(collection), coreNodeName);
   }
 
   /**
@@ -165,7 +175,8 @@ public class ClusterState implements JSO
 
 
   public DocCollection getCollectionOrNull(String coll) {
-    return collectionStates.get(coll);
+    CollectionRef ref = collectionStates.get(coll);
+    return ref == null? null:ref.get();
   }
 
   /**
@@ -175,12 +186,6 @@ public class ClusterState implements JSO
     return collectionStates.keySet();
   }
 
-  /**
-   * @return Map&lt;collectionName, Map&lt;sliceName,Slice&gt;&gt;
-   */
-  public Map<String, DocCollection> getCollectionStates() {
-    return Collections.unmodifiableMap(collectionStates);
-  }
 
   /**
    * Get names of the currently live nodes.
@@ -194,14 +199,14 @@ public class ClusterState implements JSO
   }
 
   public String getShardId(String collectionName, String nodeName, String coreName) {
-    Collection<DocCollection> states = collectionStates.values();
+    Collection<CollectionRef> states = collectionStates.values();
     if (collectionName != null) {
-      DocCollection c = getCollectionOrNull(collectionName);
-      if (c != null) states = Collections.singletonList(c);
+      CollectionRef c = collectionStates.get(collectionName);
+      if (c != null) states = Collections.singletonList( c );
     }
 
-    for (DocCollection coll : states) {
-      for (Slice slice : coll.getSlices()) {
+    for (CollectionRef coll : states) {
+      for (Slice slice : coll.get().getSlices()) {
         for (Replica replica : slice.getReplicas()) {
           // TODO: for really large clusters, we could 'index' on this
           String rnodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
@@ -215,10 +220,10 @@ public class ClusterState implements JSO
     return null;
   }
   
-  public String getShardIdByCoreNodeName(String collectionName, String coreNodeName) {
+  /*public String getShardIdByCoreNodeName(String collectionName, String coreNodeName) {
     Collection<DocCollection> states = collectionStates.values();
     if (collectionName != null) {
-      DocCollection c = getCollectionOrNull(collectionName);
+      CollectionRef c = collectionStates.get(collectionName);
       if (c != null) states = Collections.singletonList(c);
     }
 
@@ -232,7 +237,7 @@ public class ClusterState implements JSO
       }
     }
     return null;
-  }
+  }*/
 
   /**
    * Check if node is alive. 
@@ -249,42 +254,31 @@ public class ClusterState implements JSO
     return sb.toString();
   }
 
-  /**
-   * Create ClusterState by reading the current state from zookeeper. 
-   */
-  public static ClusterState load(SolrZkClient zkClient, Set<String> liveNodes, ZkStateReader stateReader) throws KeeperException, InterruptedException {
-    Stat stat = new Stat();
-    byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE,
-        null, stat, true);
-    return load(stat.getVersion(), state, liveNodes);
+  public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
+    return load(version, bytes, liveNodes, ZkStateReader.CLUSTER_STATE);
   }
-  
- 
   /**
    * Create ClusterState from json string that is typically stored in zookeeper.
    * 
-   * Use {@link ClusterState#load(SolrZkClient, Set, ZkStateReader)} instead, unless you want to
-   * do something more when getting the data - such as get the stat, set watch, etc.
    * @param version zk version of the clusterstate.json file (bytes)
    * @param bytes clusterstate.json as a byte array
    * @param liveNodes list of live nodes
    * @return the ClusterState
    */
-  public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
+  public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes, String znode) {
     // System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes)));
     if (bytes == null || bytes.length == 0) {
       return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap());
     }
     Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
-    Map<String,DocCollection> collections = new LinkedHashMap<>(stateMap.size());
+    Map<String,CollectionRef> collections = new LinkedHashMap<>(stateMap.size());
     for (Entry<String, Object> entry : stateMap.entrySet()) {
       String collectionName = entry.getKey();
-      DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version);
-      collections.put(collectionName, coll);
+      DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version, znode);
+      collections.put(collectionName, new CollectionRef(coll));
     }
 
-    // System.out.println("######## ClusterState.load result:" + collections);
-    return new ClusterState( version, liveNodes, collections);
+    return new ClusterState( liveNodes, collections,version);
   }
 
 
@@ -297,7 +291,7 @@ public class ClusterState implements JSO
     return new Aliases(aliasMap);
   }
 
-  private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version) {
+  private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version, String znode) {
     Map<String,Object> props;
     Map<String,Slice> slices;
 
@@ -324,7 +318,7 @@ public class ClusterState implements JSO
       router = DocRouter.getDocRouter((String) routerProps.get("name"));
     }
 
-    return new DocCollection(name, slices, props, router, version);
+    return new DocCollection(name, slices, props, router, version, znode);
   }
 
   private static Map<String,Slice> makeSlices(Map<String,Object> genericSlices) {
@@ -344,7 +338,28 @@ public class ClusterState implements JSO
 
   @Override
   public void write(JSONWriter jsonWriter) {
-    jsonWriter.write(collectionStates);
+    if (collectionStates.size() == 1) {
+      CollectionRef ref = collectionStates.values().iterator().next();
+      DocCollection docCollection = ref.get();
+      if (docCollection.getStateFormat() > 1) {
+        jsonWriter.write(Collections.singletonMap(docCollection.getName(), docCollection));
+        // serializing a single DocCollection that is persisted outside of clusterstate.json
+        return;
+      }
+    }
+
+    LinkedHashMap<String , DocCollection> map = new LinkedHashMap<>();
+    for (Entry<String, CollectionRef> e : collectionStates.entrySet()) {
+      // using this class check to avoid fetching from ZK in case of lazily loaded collection
+      if (e.getValue().getClass() == CollectionRef.class) {
+        // check if it is a lazily loaded collection outside of clusterstate.json
+        DocCollection coll = e.getValue().get();
+        if (coll.getStateFormat() == 1) {
+          map.put(coll.getName(),coll);
+        }
+      }
+    }
+    jsonWriter.write(map);
   }
 
   /**
@@ -353,7 +368,7 @@ public class ClusterState implements JSO
    * @return null if ClusterState was created for publication, not consumption
    */
   public Integer getZkClusterStateVersion() {
-    return zkClusterStateVersion;
+    return znodeVersion;
   }
 
   @Override
@@ -361,7 +376,7 @@ public class ClusterState implements JSO
     final int prime = 31;
     int result = 1;
     result = prime * result
-        + ((zkClusterStateVersion == null) ? 0 : zkClusterStateVersion.hashCode());
+        + ((znodeVersion == null) ? 0 : znodeVersion.hashCode());
     result = prime * result + ((liveNodes == null) ? 0 : liveNodes.hashCode());
     return result;
   }
@@ -372,9 +387,9 @@ public class ClusterState implements JSO
     if (obj == null) return false;
     if (getClass() != obj.getClass()) return false;
     ClusterState other = (ClusterState) obj;
-    if (zkClusterStateVersion == null) {
-      if (other.zkClusterStateVersion != null) return false;
-    } else if (!zkClusterStateVersion.equals(other.zkClusterStateVersion)) return false;
+    if (znodeVersion == null) {
+      if (other.znodeVersion != null) return false;
+    } else if (!znodeVersion.equals(other.znodeVersion)) return false;
     if (liveNodes == null) {
       if (other.liveNodes != null) return false;
     } else if (!liveNodes.equals(other.liveNodes)) return false;
@@ -390,8 +405,22 @@ public class ClusterState implements JSO
     this.liveNodes = liveNodes;
   }
 
-  public DocCollection getCommonCollection(String name){
-    return collectionStates.get(name);
+  /**For internal use only
+   */
+  Map<String, CollectionRef> getCollectionStates() {
+    return collectionStates;
+  }
+
+  public static class CollectionRef {
+    private final DocCollection coll;
+
+    public CollectionRef(DocCollection coll) {
+      this.coll = coll;
+    }
+
+    public DocCollection get(){
+      return coll;
+    }
 
   }
 

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java?rev=1624556&r1=1624555&r2=1624556&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java Fri Sep 12 14:11:17 2014
@@ -34,12 +34,14 @@ import org.noggit.JSONWriter;
 public class DocCollection extends ZkNodeProps {
   public static final String DOC_ROUTER = "router";
   public static final String SHARDS = "shards";
-  private int version;
+  public static final String STATE_FORMAT = "stateFormat";
+  private int znodeVersion;
 
   private final String name;
   private final Map<String, Slice> slices;
   private final Map<String, Slice> activeSlices;
   private final DocRouter router;
+  private final String znode;
 
   private final Integer replicationFactor;
   private final Integer maxShardsPerNode;
@@ -47,7 +49,7 @@ public class DocCollection extends ZkNod
 
 
   public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
-    this(name, slices, props, router, -1);
+    this(name, slices, props, router, -1, ZkStateReader.CLUSTER_STATE);
   }
 
   /**
@@ -55,9 +57,9 @@ public class DocCollection extends ZkNod
    * @param slices The logical shards of the collection.  This is used directly and a copy is not made.
    * @param props  The properties of the slice.  This is used directly and a copy is not made.
    */
-  public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion) {
-    super( props==null ? props = new HashMap<String,Object>() : props);
-    this.version = zkVersion;
+  public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion, String znode) {
+    super(props==null ? props = new HashMap<String,Object>() : props);
+    this.znodeVersion = zkVersion;
     this.name = name;
 
     this.slices = slices;
@@ -89,10 +91,17 @@ public class DocCollection extends ZkNod
         this.activeSlices.put(slice.getKey(), slice.getValue());
     }
     this.router = router;
-
+    this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode;
     assert name != null && slices != null;
   }
 
+  /**Use this to make an exact copy of DocCollection with a new set of Slices and every other property as is
+   * @param slices the new set of Slices
+   * @return the resulting DocCollection
+   */
+  public DocCollection copyWithSlices(Map<String, Slice> slices){
+    return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode);
+  }
 
   /**
    * Return collection name.
@@ -134,10 +143,13 @@ public class DocCollection extends ZkNod
     return activeSlices;
   }
 
-  public int getVersion(){
-    return version;
+  public int getZNodeVersion(){
+    return znodeVersion;
+  }
+
+  public int getStateFormat() {
+    return ZkStateReader.CLUSTER_STATE.equals(znode) ? 1 : 2;
   }
-  
   /**
    * @return replication factor for this collection or null if no
    *         replication factor exists.
@@ -157,6 +169,10 @@ public class DocCollection extends ZkNod
     return maxShardsPerNode;
   }
 
+  public String getZNode(){
+    return znode;
+  }
+
 
   public DocRouter getRouter() {
     return router;
@@ -174,4 +190,12 @@ public class DocCollection extends ZkNod
     all.put(SHARDS, slices);
     jsonWriter.write(all);
   }
+
+  public Replica getReplica(String coreNodeName) {
+    for (Slice slice : slices.values()) {
+      Replica replica = slice.getReplica(coreNodeName);
+      if (replica != null) return replica;
+    }
+    return null;
+  }
 }

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1624556&r1=1624555&r2=1624556&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Fri Sep 12 14:11:17 2014
@@ -38,12 +38,14 @@ import java.io.UnsupportedEncodingExcept
 import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -100,9 +102,14 @@ public class ZkStateReader implements Cl
   public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
 
   public static final String SHARD_LEADERS_ZKNODE = "leaders";
+  private final Set<String> watchedCollections = new HashSet<String>();
+
+  /**These are collections which are actively watched by this  instance .
+   *
+   */
+  private Map<String , DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
 
 
-  
   //
   // convenience methods... should these go somewhere else?
   //
@@ -163,7 +170,8 @@ public class ZkStateReader implements Cl
           log.info("path={} {}={} specified config exists in ZooKeeper",
               new Object[] {path, CONFIGNAME_PROP, configName});
         }
-
+      } else {
+        throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path);
       }
     }
     catch (KeeperException e) {
@@ -252,22 +260,26 @@ public class ZkStateReader implements Cl
     return aliases;
   }
 
-  /*public Boolean checkValid(String coll, int version){
+  public Boolean checkValid(String coll, int version) {
     DocCollection collection = clusterState.getCollectionOrNull(coll);
-    if(collection ==null) return null;
-    if(collection.getVersion() < version){
-      log.info("server older than client {}<{}",collection.getVersion(),version);
-      DocCollection nu = getExternCollectionFresh(this, coll);
-      if(nu.getVersion()> collection.getVersion()){
-        updateExternCollection(nu);
+    if (collection == null) return null;
+    if (collection.getZNodeVersion() < version) {
+      log.debug("server older than client {}<{}", collection.getZNodeVersion(), version);
+      DocCollection nu = getCollectionLive(this, coll);
+      if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
+        updateWatchedCollection(nu);
         collection = nu;
       }
     }
-    if(collection.getVersion() == version) return Boolean.TRUE;
-    log.info("wrong version from client {}!={} ",version, collection.getVersion());
+    
+    if (collection.getZNodeVersion() == version) {
+      return Boolean.TRUE;
+    }
+    
+    log.debug("wrong version from client {}!={} ", version, collection.getZNodeVersion());
+    
     return Boolean.FALSE;
-
-  }*/
+  }
   
   public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
       InterruptedException {
@@ -296,18 +308,9 @@ public class ZkStateReader implements Cl
             synchronized (ZkStateReader.this.getUpdateLock()) {
               // remake watch
               final Watcher thisWatch = this;
-              Stat stat = new Stat();
-              byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat ,
-                  true);
               Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes();
-              ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln);
               // update volatile
-              ZkStateReader.this.clusterState = clusterState;
-
-//              HashSet<String> all = new HashSet<>(colls);;
-//              all.addAll(clusterState.getAllInternalCollections());
-//              all.remove(null);
-
+              ZkStateReader.this.clusterState = constructState(ln, thisWatch);
             }
           } catch (KeeperException e) {
             if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -376,8 +379,7 @@ public class ZkStateReader implements Cl
     
       Set<String> liveNodeSet = new HashSet<>();
       liveNodeSet.addAll(liveNodes);
-      ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this);
-      this.clusterState = clusterState;
+      this.clusterState = constructState(liveNodeSet, null);
 
       zkClient.exists(ALIASES,
           new Watcher() {
@@ -423,9 +425,69 @@ public class ZkStateReader implements Cl
           }, true);
     }
     updateAliases();
+    //on reconnect of SolrZkClient re-add watchers for the watched external collections
+    synchronized (this) {
+      for (String watchedCollection : watchedCollections) {
+        addZkWatch(watchedCollection);
+      }
+    }
+  }
+
+  private ClusterState constructState(Set<String> ln, Watcher watcher)
+      throws KeeperException, InterruptedException {
+    Stat stat = new Stat();
+    byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
+    ClusterState loadedData = ClusterState.load(stat.getVersion(), data, ln,
+        CLUSTER_STATE);
+    Map<String,ClusterState.CollectionRef> result = new LinkedHashMap<>();
+    result.putAll(loadedData.getCollectionStates());// first load all
+                                                    // collections in
+                                                    // clusterstate.json
+    for (String s : getIndividualColls()) {
+      DocCollection watched = watchedCollectionStates.get(s);
+      if (watched != null) {
+        // if it is a watched collection, add too
+        result.put(s, new ClusterState.CollectionRef(watched));
+      } else {
+        // if it is not collection, then just create a reference which can fetch 
+        // the collection object just in time from ZK
+        final String collName = s;
+        result.put(s, new ClusterState.CollectionRef(null) {
+          @Override
+          public DocCollection get() {
+            return getCollectionLive(ZkStateReader.this, collName);
+          }
+        });
+      }
+    }
+    return new ClusterState(ln, result, stat.getVersion());
   }
 
 
+  private Set<String> getIndividualColls() throws KeeperException, InterruptedException {
+    List<String> children = null;
+    try {
+      children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
+    } catch (KeeperException.NoNodeException e) {
+      log.warn("Error fetching collection names");
+      
+      return new HashSet<>();
+    }
+    if (children == null || children.isEmpty()) return new HashSet<>();
+    HashSet<String> result = new HashSet<>(children.size());
+    
+    for (String c : children) {
+      try {
+        if (zkClient.exists(getCollectionPath(c), true)) {
+          result.add(c);
+        }
+      } catch (Exception e) {
+        log.warn("Error reading collections nodes", e);
+      }
+    }
+    return result;
+  }
+
   // load and publish a new CollectionInfo
   private synchronized void updateClusterState(boolean immediate,
       final boolean onlyLiveNodes) throws KeeperException,
@@ -443,7 +505,7 @@ public class ZkStateReader implements Cl
         if (!onlyLiveNodes) {
           log.debug("Updating cloud state from ZooKeeper... ");
           
-          clusterState = ClusterState.load(zkClient, liveNodesSet,this);
+          clusterState = constructState(liveNodesSet,null);
         } else {
           log.debug("Updating live nodes from ZooKeeper... ({})", liveNodesSet.size());
           clusterState = this.clusterState;
@@ -451,6 +513,11 @@ public class ZkStateReader implements Cl
         }
         this.clusterState = clusterState;
       }
+      synchronized (ZkStateReader.this) {
+        for (String watchedCollection : watchedCollections) {
+          updateWatchedCollection(getCollectionLive(this, watchedCollection));
+        }
+      }
 
     } else {
       if (clusterStateUpdateScheduled) {
@@ -475,13 +542,12 @@ public class ZkStateReader implements Cl
               
               if (!onlyLiveNodes) {
                 log.debug("Updating cloud state from ZooKeeper... ");
-                
-                clusterState = ClusterState.load(zkClient, liveNodesSet,ZkStateReader.this);
+
+                clusterState = constructState(liveNodesSet,null);
               } else {
                 log.debug("Updating live nodes from ZooKeeper... ");
                 clusterState = ZkStateReader.this.clusterState;
                 clusterState.setLiveNodes(liveNodesSet);
-
               }
               
               ZkStateReader.this.clusterState = clusterState;
@@ -504,13 +570,18 @@ public class ZkStateReader implements Cl
             } 
             // update volatile
             ZkStateReader.this.clusterState = clusterState;
+
+            synchronized (ZkStateReader.this) {
+              for (String watchedCollection : watchedCollections) {
+                updateWatchedCollection(getCollectionLive(ZkStateReader.this, watchedCollection));
+              }
+            }
           }
         }
       }, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
     }
-
   }
-   
+
   /**
    * @return information about the cluster from ZooKeeper
    */
@@ -679,4 +750,131 @@ public class ZkStateReader implements Cl
     }
   }
 
+  public static DocCollection getCollectionLive(ZkStateReader zkStateReader,
+      String coll) {
+    String collectionPath = getCollectionPath(coll);
+    try {
+      if (!zkStateReader.getZkClient().exists(collectionPath, true)) return null;
+      Stat stat = new Stat();
+      byte[] data = zkStateReader.getZkClient().getData(collectionPath, null, stat, true);
+      ClusterState state = ClusterState.load(stat.getVersion(), data,
+          Collections.<String> emptySet(), collectionPath);
+      ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
+      return collectionRef == null ? null : collectionRef.get();
+    } catch (KeeperException.NoNodeException e) {
+      log.warn("No node available : " + collectionPath, e);
+      return null;
+    } catch (KeeperException e) {
+      throw new SolrException(ErrorCode.BAD_REQUEST,
+          "Could not load collection from ZK:" + coll, e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(ErrorCode.BAD_REQUEST,
+          "Could not load collection from ZK:" + coll, e);
+    }
+  }
+
+  public static String getCollectionPath(String coll) {
+    return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
+  }
+
+  public void addCollectionWatch(String coll) throws KeeperException, InterruptedException {
+    synchronized (this) {
+      if (watchedCollections.contains(coll)) return;
+      else {
+        watchedCollections.add(coll);
+      }
+      addZkWatch(coll);
+    }
+  }
+
+  private void addZkWatch(final String coll) throws KeeperException,
+      InterruptedException {
+    log.info("addZkWatch {}", coll);
+    final String fullpath = getCollectionPath(coll);
+    synchronized (getUpdateLock()) {
+      
+      cmdExecutor.ensureExists(fullpath, zkClient);
+      log.info("Updating collection state at {} from ZooKeeper... ", fullpath);
+      
+      Watcher watcher = new Watcher() {
+        
+        @Override
+        public void process(WatchedEvent event) {
+          // session events are not change events,
+          // and do not remove the watcher
+          if (EventType.None.equals(event.getType())) {
+            return;
+          }
+          log.info("A cluster state change: {}, has occurred - updating... ",
+              (event), ZkStateReader.this.clusterState == null ? 0
+                  : ZkStateReader.this.clusterState.getLiveNodes().size());
+          try {
+            
+            // delayed approach
+            // ZkStateReader.this.updateClusterState(false, false);
+            synchronized (ZkStateReader.this.getUpdateLock()) {
+              if (!watchedCollections.contains(coll)) {
+                log.info("Unwatched collection {}", coll);
+                return;
+              }
+              // remake watch
+              final Watcher thisWatch = this;
+              Stat stat = new Stat();
+              byte[] data = zkClient.getData(fullpath, thisWatch, stat, true);
+              
+              if (data == null || data.length == 0) {
+                log.warn("No value set for collection state : {}", coll);
+                return;
+                
+              }
+              ClusterState clusterState = ClusterState.load(stat.getVersion(),
+                  data, Collections.<String> emptySet(), fullpath);
+              // update volatile
+              
+              DocCollection newState = clusterState.getCollectionStates()
+                  .get(coll).get();
+              updateWatchedCollection(newState);
+              
+            }
+          } catch (KeeperException e) {
+            if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+              log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+              return;
+            }
+            log.error("Unwatched collection :" + coll, e);
+            throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
+            
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("Unwatched collection :" + coll, e);
+            return;
+          }
+        }
+        
+      };
+      zkClient.exists(fullpath, watcher, true);
+    }
+    updateWatchedCollection(getCollectionLive(this, coll));
+  }
+  
+  private void updateWatchedCollection(DocCollection newState) {
+    watchedCollectionStates.put(newState.getName(), newState);
+    log.info("Updating data for {} to ver {} ", newState.getName(),
+        newState.getZNodeVersion());
+    
+    this.clusterState = clusterState.copyWith(Collections.singletonMap(
+        newState.getName(), newState));
+  }
+  
+  /** This is not a public API. Only used by ZkController */
+  public void removeZKWatch(final String coll) {
+    synchronized (this) {
+      watchedCollections.remove(coll);
+      clusterState = clusterState.copyWith(Collections
+          .<String,DocCollection> singletonMap(coll, null));
+    }
+  }
+
 }

Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java?rev=1624556&r1=1624555&r2=1624556&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java Fri Sep 12 14:11:17 2014
@@ -20,7 +20,6 @@ package org.apache.solr.client.solrj.imp
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -42,7 +41,6 @@ import org.apache.solr.client.solrj.requ
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
 import org.apache.solr.cloud.AbstractZkTestCase;
-import org.apache.solr.cloud.Overseer;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
@@ -51,7 +49,6 @@ import org.apache.solr.common.cloud.DocC
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -62,7 +59,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -125,6 +121,7 @@ public class CloudSolrServerTest extends
   @Override
   public void doTest() throws Exception {
     allTests();
+    stateVersionParamTest();
   }
 
   private void allTests() throws Exception {
@@ -345,7 +342,77 @@ public class CloudSolrServerTest extends
     SolrInputDocument doc = getDoc(fields);
     indexDoc(doc);
   }
-  
+
+  private void stateVersionParamTest() throws Exception {
+    CloudSolrServer client = createCloudClient(null);
+    try {
+      String collectionName = "checkStateVerCol";
+      createCollection(collectionName, client, 2, 2);
+      waitForRecoveriesToFinish(collectionName, false);
+      DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName);
+      Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
+
+      HttpSolrServer httpSolrServer = new HttpSolrServer(r.getStr(ZkStateReader.BASE_URL_PROP) + "/"+collectionName);
+
+
+      SolrQuery q = new SolrQuery().setQuery("*:*");
+
+      log.info("should work query, result {}", httpSolrServer.query(q));
+      //no problem
+      q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
+      log.info("2nd query , result {}", httpSolrServer.query(q));
+      //no error yet good
+
+      q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+ (coll.getZNodeVersion() -1)); //an older version expect error
+
+      HttpSolrServer.RemoteSolrException sse = null;
+      try {
+        httpSolrServer.query(q);
+        log.info("expected query error");
+      } catch (HttpSolrServer.RemoteSolrException e) {
+        sse = e;
+      }
+      httpSolrServer.shutdown();
+      assertNotNull(sse);
+      assertEquals(" Error code should be ",  sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
+
+      //now send the request to another node that does n ot serve the collection
+
+      Set<String> allNodesOfColl = new HashSet<>();
+      for (Slice slice : coll.getSlices()) {
+        for (Replica replica : slice.getReplicas()) {
+          allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
+        }
+      }
+      String theNode = null;
+      for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) {
+        String n = client.getZkStateReader().getBaseUrlForNodeName(s);
+        if(!allNodesOfColl.contains(s)){
+          theNode = n;
+          break;
+        }
+      }
+      log.info("thenode which does not serve this collection{} ",theNode);
+      assertNotNull(theNode);
+      httpSolrServer = new HttpSolrServer(theNode + "/"+collectionName);
+
+      q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
+
+      try {
+        httpSolrServer.query(q);
+        log.info("error was expected");
+      } catch (HttpSolrServer.RemoteSolrException e) {
+        sse = e;
+      }
+      httpSolrServer.shutdown();
+      assertNotNull(sse);
+      assertEquals(" Error code should be ",  sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
+    } finally {
+      client.shutdown();
+    }
+
+  }
+
   public void testShutdown() throws MalformedURLException {
     CloudSolrServer server = new CloudSolrServer("[ff01::114]:33332");
     try {



Mime
View raw message