lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r1203340 - in /lucene/dev/branches/solrcloud/solr/core/src: java/org/apache/solr/update/processor/ test/org/apache/solr/cloud/
Date Thu, 17 Nov 2011 19:22:53 GMT
Author: markrmiller
Date: Thu Nov 17 19:22:53 2011
New Revision: 1203340

URL: http://svn.apache.org/viewvc?rev=1203340&view=rev
Log:
take a bunch of crap that we won't use out of the distrib update processor - refactor code
from the factory getInstance method to individual process add/delete methods because we need
to hash before that code - add in some super fake hash to shard mapping - enable assert in
test that wasn't going to work until we actually did some hashing to figure out which shard
to address

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1203340&r1=1203339&r2=1203340&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
Thu Nov 17 19:22:53 2011
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -44,9 +46,16 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.Hash;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestInfo;
@@ -59,9 +68,12 @@ import org.apache.solr.update.UpdateHand
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.VersionBucket;
 import org.apache.solr.update.VersionInfo;
+import org.apache.zookeeper.KeeperException;
 
 // NOT mt-safe... create a new processor for each add thread
 public class DistributedUpdateProcessor extends UpdateRequestProcessor {
+  public static final String SEEN_LEADER = "leader";
+  
   // TODO: shut this thing down
   static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
       5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
@@ -84,16 +96,10 @@ public class DistributedUpdateProcessor 
   private final SchemaField idField;
   
   //private List<String> shards;
-  private final List<AddUpdateCommand>[] adds;
-  private final List<DeleteUpdateCommand>[] deletes;
-  
-  String selfStr;
 
   int maxBufferedAddsPerServer = 10;
   int maxBufferedDeletesPerServer = 100;
 
-  private DistributedUpdateProcessorFactory factory;
-
   private static final String VERSION_FIELD = "_version_";
   private final UpdateHandler updateHandler;
   private final UpdateLog ulog;
@@ -104,45 +110,22 @@ public class DistributedUpdateProcessor 
   private NamedList addsResponse = null;
   private NamedList deleteResponse = null;
   private CharsRef scratch;
-  private final boolean isLeader;
+  private boolean isLeader;
   private boolean forwardToLeader;
   private volatile String shardStr;
+
+  private List<AddUpdateCommand> alist;
+
+  private ArrayList<DeleteUpdateCommand> dlist;
   
-  public DistributedUpdateProcessor(String shardStr, SolrQueryRequest req,
-      SolrQueryResponse rsp, DistributedUpdateProcessorFactory factory,
-      boolean isLeader, boolean forwardToLeader, UpdateRequestProcessor next) {
+  public DistributedUpdateProcessor(SolrQueryRequest req,
+      SolrQueryResponse rsp, UpdateRequestProcessor next) {
     super(next);
-    this.shardStr = shardStr;
-    this.factory = factory;
     this.rsp = rsp;
     this.next = next;
     this.idField = req.getSchema().getUniqueKeyField();
-
-    
-    String selfStr = req.getParams().get("self", factory.selfStr);
-    
-//    self = -1;
-//    if (shards != null) {
-//      for (int i = 0; i < shards.size(); i++) {
-//        if (shards.get(i).equals(selfStr)) {
-//          self = i;
-//          break;
-//        }
-//      }
-//    }
-//    
-//    if (shards == null) {
-//      shards = new ArrayList<String>(1);
-//      shards.add("self");
-//      self = 0;
-//    }
-    
-    adds = new List[1];
-    deletes = new List[1];
     
     // version init
-    this.isLeader = isLeader;
-    this.forwardToLeader = forwardToLeader;
 
     this.updateHandler = req.getCore().getUpdateHandler();
     this.ulog = updateHandler.getUpdateLog();
@@ -157,17 +140,110 @@ public class DistributedUpdateProcessor 
 
   }
 
-//  private int getSlot(String id) {
-//    return (id.hashCode() >>> 1) % shards.size();
-//  }
+  private void setupRequest(int hash) {
+    System.out.println("hash:" + hash);
+    CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
+    
+    
+    String shardId = getShard(hash); // get the right shard based on the hash...
+    
+    
+    // TODO: first thing we actually have to do here is get a hash so we can send to the
right shard...
+    // to do that, most of this likely has to move
+    
+    // if we are in zk mode...
+    if (coreDesc.getCoreContainer().getZkController() != null) {
+      // the leader is...
+      // TODO: if there is no leader, wait and look again
+      // TODO: we are reading the leader from zk every time - we should cache
+      // this and watch for changes??
+     
+      String collection = coreDesc.getCloudDescriptor().getCollectionName();
+
+      
+      ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+      
+      String leaderNode = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
+          + ZkStateReader.LEADER_ELECT_ZKNODE + "/" + shardId + "/leader";
+      SolrZkClient zkClient = coreDesc.getCoreContainer().getZkController()
+          .getZkClient();
+
+      try {
+        List<String> leaderChildren = zkClient.getChildren(leaderNode, null);
+        if (leaderChildren.size() > 0) {
+          String leader = leaderChildren.get(0);
+          
+          ZkNodeProps zkNodeProps = new ZkNodeProps();
+          byte[] bytes = zkClient
+              .getData(leaderNode + "/" + leader, null, null);
+          zkNodeProps.load(bytes);
+          
+          String leaderUrl = zkNodeProps.get("url");
+          
+          String nodeName = req.getCore().getCoreDescriptor()
+              .getCoreContainer().getZkController().getNodeName();
+          String shardZkNodeName = nodeName + "_" + req.getCore().getName();
+
+          System.out.println("params:" + params);
+          if (params.getBool(SEEN_LEADER, false)) {
+            // we got a version, just go local - add no shards param
+            
+            // still mark if i am the leader though
+            if (shardZkNodeName.equals(leader)) {
+              isLeader = true;
+            }
+            System.out.println(" go local");
+          } else if (shardZkNodeName.equals(leader)) {
+            isLeader = true;
+            // that means I want to forward onto my replicas...
+            // so get the replicas...
+            shardStr = addReplicas(req, collection, shardId,
+                shardZkNodeName);
+            
+            // mark that this req has been to the leader
+            params.set(SEEN_LEADER, true);
+            System.out.println("mark leader seen");
+          } else {
+            // I need to forward onto the leader...
+            // first I must hash...
+            shardStr = leaderUrl;
+            forwardToLeader  = true;
+          }
+          System.out.println("set params on req:" + params);
+          req.setParams(params);
+        }
+      } catch (KeeperException e) {
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      } catch (IOException e) {
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      }
+    }
+  }
   
+  private String getShard(int hash) {
+    if (hash < -715827884) {
+      return "shard1";
+    } else if (hash < 715827881) {
+      return "shard2";
+    } else {
+      return "shard3";
+    }
+      
+  }
+
   @Override
   public void processAdd(AddUpdateCommand cmd) throws IOException {
+    int hash = hash(cmd);
     
-    versionAdd(cmd);
+    setupRequest(hash);
+    versionAdd(cmd, hash);
     
-//    String shardStr = null;
-//    this.shardStr = shardStr;
     distribAdd(cmd);
     
     if (returnVersions && rsp != null) {
@@ -186,22 +262,17 @@ public class DistributedUpdateProcessor 
     // processor too.
   }
 
-  private void versionAdd(AddUpdateCommand cmd) throws IOException {
+  private void versionAdd(AddUpdateCommand cmd, int hash) throws IOException {
     if (vinfo == null) {
       super.processAdd(cmd);
       return;
     }
 
-    boolean leaderForUpdate = isLeader;
     System.out.println("LeaderParam:"
-        + req.getParams().get(DistributedUpdateProcessorFactory.SEEN_LEADER));
-   // leaderForUpdate = req.getParams().getBool(
-    //    VersionProcessorFactory.SEEN_LEADER, false); // TODO: we need a better
-                                                    // indicator of when an
-                                                    // update comes from a
-                                                    // leader
+        + req.getParams().get(SEEN_LEADER));
+
 
-    System.out.println("leader? " + leaderForUpdate);
+    System.out.println("leader? " + isLeader);
     if (forwardToLeader) {
       // TODO: forward update to the leader
       System.out.println("forward to leader");
@@ -223,7 +294,7 @@ public class DistributedUpdateProcessor 
 
 
 
-    VersionBucket bucket = vinfo.bucket(hash(cmd));
+    VersionBucket bucket = vinfo.bucket(hash);
     synchronized (bucket) {
       // we obtain the version when synchronized and then do the add so we can ensure that
       // if version1 < version2 then version1 is actually added before version2.
@@ -236,7 +307,7 @@ public class DistributedUpdateProcessor 
       if (versionsStored) {
         long bucketVersion = bucket.highest;
 
-        if (leaderForUpdate) {
+        if (isLeader) {
           long version = vinfo.getNewClock();
           cmd.setVersion(version);
           cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
@@ -284,15 +355,9 @@ public class DistributedUpdateProcessor 
       if (next != null) next.processAdd(cmd);
       return;
     }
-    String id = field.getFirstValue().toString();
-//    int slot = getSlot(id);
-//    if (slot == self) {
-//      if (next != null) next.processAdd(cmd);
-//      return;
-//    }
     
     // make sure any pending deletes are flushed
-    flushDeletes(0, 1, null);
+    flushDeletes(1, null);
     
     // TODO: this is brittle
     // need to make a clone since these commands may be reused
@@ -306,14 +371,13 @@ public class DistributedUpdateProcessor 
     // clone.indexedId = cmd.indexedId;
     // clone.doc = cmd.doc;
     
-    List<AddUpdateCommand> alist = adds[0];
+
     if (alist == null) {
       alist = new ArrayList<AddUpdateCommand>(2);
-      adds[0] = alist;
     }
     alist.add(clone);
     
-    flushAdds(0, maxBufferedAddsPerServer, null);
+    flushAdds(maxBufferedAddsPerServer, null);
   }
   
   // TODO: this is brittle
@@ -326,24 +390,28 @@ public class DistributedUpdateProcessor 
   
   private void doDelete(DeleteUpdateCommand cmd) throws IOException {
     
-    flushAdds(0, 1, null);
+    flushAdds(1, null);
     
-    List<DeleteUpdateCommand> dlist = deletes[0];
     if (dlist == null) {
       dlist = new ArrayList<DeleteUpdateCommand>(2);
-      deletes[0] = dlist;
     }
     dlist.add(clone(cmd));
     
-    flushDeletes(0, maxBufferedDeletesPerServer, null);
+    flushDeletes(maxBufferedDeletesPerServer, null);
   }
   
   @Override
   public void processDelete(DeleteUpdateCommand cmd) throws IOException {
-//    String shardStr = null;
-//    this.shardStr = shardStr;
+    int hash = 0;
+    if (cmd.getIndexedId() == null) {
+      // delete by query...
+    } else {
+      hash = hash(cmd);
+    }
     
-    versionDelete(cmd);
+    setupRequest(hash);
+    
+    versionDelete(cmd, hash);
     
     distribDelete(cmd);
 
@@ -359,7 +427,7 @@ public class DistributedUpdateProcessor 
     }
   }
 
-  private void versionDelete(DeleteUpdateCommand cmd) throws IOException {
+  private void versionDelete(DeleteUpdateCommand cmd, int hash) throws IOException {
     if (cmd == null) {
       throw new NullArgumentException("cmd is null");
     }
@@ -374,16 +442,8 @@ public class DistributedUpdateProcessor 
       return;
     }
 
-    boolean leaderForUpdate = isLeader;
-   // leaderForUpdate = req.getParams().getBool(
-   //     VersionProcessorFactory.SEEN_LEADER, false); // TODO: we need a better
-                                                    // indicator of when an
-                                                    // update comes from a
-                                                    // leader
 
     if (forwardToLeader) {
-      // TODO: forward update to the leader
-
       return;
     }
 
@@ -395,12 +455,12 @@ public class DistributedUpdateProcessor 
     Long versionOnUpdate = versionOnUpdateS == null ? null : Long.parseLong(versionOnUpdateS);
 
 
-    VersionBucket bucket = vinfo.bucket(hash(cmd));
+    VersionBucket bucket = vinfo.bucket(hash);
     synchronized (bucket) {
       if (versionsStored) {
         long bucketVersion = bucket.highest;
 
-        if (leaderForUpdate) {
+        if (isLeader) {
           long version = vinfo.getNewClock();
           cmd.setVersion(-version);
           bucket.updateHighest(version);
@@ -445,10 +505,7 @@ public class DistributedUpdateProcessor 
     if (cmd.id != null) {
       doDelete(cmd);
     } else if (cmd.query != null) {
-      // query must be broadcast to all
-      for (int slot = 0; slot < deletes.length; slot++) {
-        doDelete(cmd);
-      }
+      // TODO: query must be broadcast to all ??
       doDelete(cmd);
     }
   }
@@ -474,17 +531,17 @@ public class DistributedUpdateProcessor 
     
     for (int slot = 0; slot < 1; slot++) {
       // piggyback on any outstanding adds or deletes if possible.
-      if (flushAdds(slot, 1, cmd)) continue;
-      if (flushDeletes(slot, 1, cmd)) continue;
+      if (flushAdds(1, cmd)) continue;
+      if (flushDeletes( 1, cmd)) continue;
       
       UpdateRequestExt ureq = new UpdateRequestExt();
       // pass on version
       if (ureq.getParams() == null) {
         ureq.setParams(new ModifiableSolrParams());
       }
-      String seenLeader = req.getParams().get(DistributedUpdateProcessorFactory.SEEN_LEADER);
+      String seenLeader = req.getParams().get(SEEN_LEADER);
       if (seenLeader != null) {
-        ureq.getParams().add(DistributedUpdateProcessorFactory.SEEN_LEADER, seenLeader);
+        ureq.getParams().add(SEEN_LEADER, seenLeader);
       }
       
       // nocommit: we add the right update chain - we should add the current one?
@@ -506,8 +563,8 @@ public class DistributedUpdateProcessor 
   public void finish() throws IOException {
 
     // piggyback on any outstanding adds or deletes if possible.
-    flushAdds(0, 1, null);
-    flushDeletes(0, 1, null);
+    flushAdds(1, null);
+    flushDeletes(1, null);
 
     checkResponses(true);
     if (next != null && shardStr == null) next.finish();
@@ -576,9 +633,8 @@ public class DistributedUpdateProcessor 
         : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
   }
   
-  boolean flushAdds(int slot, int limit, CommitUpdateCommand ccmd) {
+  boolean flushAdds(int limit, CommitUpdateCommand ccmd) {
     // check for pending deletes
-    List<AddUpdateCommand> alist = adds[slot];
     if (alist == null || alist.size() < limit) return false;
     
     UpdateRequestExt ureq = new UpdateRequestExt();
@@ -586,9 +642,9 @@ public class DistributedUpdateProcessor 
     if (ureq.getParams() == null) {
       ureq.setParams(new ModifiableSolrParams());
     }
-    String seenLeader = req.getParams().get(DistributedUpdateProcessorFactory.SEEN_LEADER);
+    String seenLeader = req.getParams().get(SEEN_LEADER);
     if (seenLeader != null) {
-      ureq.getParams().add(DistributedUpdateProcessorFactory.SEEN_LEADER, seenLeader);
+      ureq.getParams().add(SEEN_LEADER, seenLeader);
     }
     // nocommit: we add the right update chain - we should add the current one?
     ureq.getParams().add("update.chain", "distrib-update-chain");
@@ -598,14 +654,13 @@ public class DistributedUpdateProcessor 
       ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
     }
     
-    adds[slot] = null;
+    alist = null;
     submit(ureq);
     return true;
   }
   
-  boolean flushDeletes(int slot, int limit, CommitUpdateCommand ccmd) {
+  boolean flushDeletes(int limit, CommitUpdateCommand ccmd) {
     // check for pending deletes
-    List<DeleteUpdateCommand> dlist = deletes[slot];
     if (dlist == null || dlist.size() < limit) return false;
     
     UpdateRequestExt ureq = new UpdateRequestExt();
@@ -614,9 +669,9 @@ public class DistributedUpdateProcessor 
       ureq.setParams(new ModifiableSolrParams());
     }
 
-    String seenLeader = req.getParams().get(DistributedUpdateProcessorFactory.SEEN_LEADER);
+    String seenLeader = req.getParams().get(SEEN_LEADER);
     if (seenLeader != null) {
-      ureq.getParams().add(DistributedUpdateProcessorFactory.SEEN_LEADER, seenLeader);
+      ureq.getParams().add(SEEN_LEADER, seenLeader);
     }
     
     // nocommit: we add the right update chain - we should add the current one?
@@ -631,7 +686,7 @@ public class DistributedUpdateProcessor 
       }
     }
     
-    deletes[slot] = null;
+    dlist = null;
     submit(ureq);
     return true;
   }
@@ -709,6 +764,27 @@ public class DistributedUpdateProcessor 
     }
   }
   
+  private String addReplicas(SolrQueryRequest req, String collection,
+      String shardId, String shardZkNodeName) {
+    CloudState cloudState = req.getCore().getCoreDescriptor()
+        .getCoreContainer().getZkController().getCloudState();
+    Slice replicas = cloudState.getSlices(collection).get(shardId);
+    Map<String,ZkNodeProps> shardMap = replicas.getShards();
+    //String self = null;
+    StringBuilder replicasUrl = new StringBuilder();
+    for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
+      if (replicasUrl.length() > 0) {
+        replicasUrl.append("|");
+      }
+      String replicaUrl = entry.getValue().get("url");
+      replicasUrl.append(replicaUrl);
+    }
+
+    // we don't currently use self - it does not yet work with the | notation anyhow
+    //params.add("self", self);
+    return replicasUrl.toString();
+  }
+  
   // TODO: move this to AddUpdateCommand/DeleteUpdateCommand and cache it? And make the hash
pluggable of course.
   // The hash also needs to be pluggable
   private int hash(AddUpdateCommand cmd) {

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java?rev=1203340&r1=1203339&r2=1203340&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
Thu Nov 17 19:22:53 2011
@@ -17,175 +17,24 @@ package org.apache.solr.update.processor
  * limitations under the License.
  */
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.CloudState;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
-import org.apache.zookeeper.KeeperException;
 
 public class DistributedUpdateProcessorFactory extends
     UpdateRequestProcessorFactory {
-  public static final String SEEN_LEADER = "leader";
-  NamedList args;
-  //List<String> shards;
-  String selfStr;
-  String shardsString;
+
   
   @Override
   public void init(NamedList args) {
-//    selfStr = (String) args.get("self");
-//    Object o = args.get("shards");
-//    if (o != null && o instanceof List) {
-//      shards = (List<String>) o;
-//      shardsString = StrUtils.join((List<String>) o, ',');
-//    } else if (o != null && o instanceof String) {
-//      shards = StrUtils.splitSmart((String) o, ",", true);
-//      shardsString = (String) o;
-//    }
-  }
-  
-//  /** return the list of shards, or null if not configured */
-//  public List<String> getShards() {
-//    return shards;
-//  }
-  
-  public String getShardsString() {
-    return shardsString;
-  }
-  
-  /** return "self", or null if not configured */
-  public String getSelf() {
-    return selfStr;
+
   }
   
   @Override
   public DistributedUpdateProcessor getInstance(SolrQueryRequest req,
       SolrQueryResponse rsp, UpdateRequestProcessor next) {
-    CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
-    boolean isLeader = false;
-    boolean forwardToLeader = false;
-    
-    // TODO: first thing we actually have to do here is get a hash so we can send to the
right shard...
-    // to do that, most of this likely has to move
-    
-    String shardStr = null;
-    // if we are in zk mode...
-    if (coreDesc.getCoreContainer().getZkController() != null) {
-      // the leader is...
-      // TODO: if there is no leader, wait and look again
-      // TODO: we are reading the leader from zk every time - we should cache
-      // this
-      // and watch for changes
-     
-      String collection = coreDesc.getCloudDescriptor().getCollectionName();
-      String shardId = coreDesc.getCloudDescriptor().getShardId();
-      
-      ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
-      
-      String leaderNode = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
-          + ZkStateReader.LEADER_ELECT_ZKNODE + "/" + shardId + "/leader";
-      SolrZkClient zkClient = coreDesc.getCoreContainer().getZkController()
-          .getZkClient();
-
-      try {
-        List<String> leaderChildren = zkClient.getChildren(leaderNode, null);
-        if (leaderChildren.size() > 0) {
-          String leader = leaderChildren.get(0);
-          
-          ZkNodeProps zkNodeProps = new ZkNodeProps();
-          byte[] bytes = zkClient
-              .getData(leaderNode + "/" + leader, null, null);
-          zkNodeProps.load(bytes);
-          
-          String leaderUrl = zkNodeProps.get("url");
-          
-          String nodeName = req.getCore().getCoreDescriptor()
-              .getCoreContainer().getZkController().getNodeName();
-          String shardZkNodeName = nodeName + "_" + req.getCore().getName();
 
-          System.out.println("params:" + params);
-          if (params.getBool(SEEN_LEADER, false)) {
-            // we got a version, just go local - add no shards param
-            
-            // still mark if i am the leader though
-            if (shardZkNodeName.equals(leader)) {
-              isLeader = true;
-            }
-            System.out.println(" go local");
-          } else if (shardZkNodeName.equals(leader)) {
-            isLeader = true;
-            // that means I want to forward onto my replicas...
-            // so get the replicas...
-            shardStr = addReplicas(req, collection, shardId,
-                shardZkNodeName);
-            
-            // mark that this req has been to the leader
-            params.set(SEEN_LEADER, true);
-            System.out.println("mark leader seen");
-          } else {
-            // I need to forward onto the leader...
-            // first I must hash...
-            shardStr = leaderUrl;
-            forwardToLeader  = true;
-          }
-          System.out.println("set params on req:" + params);
-          req.setParams(params);
-        }
-      } catch (KeeperException e) {
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
-            e);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
-            e);
-      } catch (IOException e) {
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
-            e);
-      }
-    }
-    
-//    String shardStr = req.getParams().get("shards");
-//    if (shards == null && shardStr == null) return null;
-    System.out.println("set shards:" + shardStr);
-    return new DistributedUpdateProcessor(shardStr, req, rsp, this, isLeader, forwardToLeader,
next);
+    return new DistributedUpdateProcessor(req, rsp, next);
   }
-
-  private String addReplicas(SolrQueryRequest req, String collection,
-      String shardId, String shardZkNodeName) {
-    CloudState cloudState = req.getCore().getCoreDescriptor()
-        .getCoreContainer().getZkController().getCloudState();
-    Slice replicas = cloudState.getSlices(collection).get(shardId);
-    Map<String,ZkNodeProps> shardMap = replicas.getShards();
-    //String self = null;
-    StringBuilder replicasUrl = new StringBuilder();
-    for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
-      if (replicasUrl.length() > 0) {
-        replicasUrl.append("|");
-      }
-      String replicaUrl = entry.getValue().get("url");
-//      if (shardZkNodeName.equals(entry.getKey())) {
-//        self = replicaUrl;
-//      }
-      replicasUrl.append(replicaUrl);
-    }
-
-    // we don't currently use self - it does not yet work with the | notation anyhow
-    //params.add("self", self);
-    return replicasUrl.toString();
-  }
-
+  
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1203340&r1=1203339&r2=1203340&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
Thu Nov 17 19:22:53 2011
@@ -412,7 +412,7 @@ public class FullDistributedZkTest exten
     // TODO: this is failing because the counts per shard don't add up to the control - distrib
total
     // counts do match, so the same doc (same id) must be on different shards.
     // our hash is not stable yet in distrib update proc
-    //assertDocCounts();
+    assertDocCounts();
 
     // kill a shard
     JettySolrRunner deadShard = killShard("shard2", 0);



Mime
View raw message