lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r1204029 - in /lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr: cloud/HashPartitioner.java update/processor/DistributedUpdateProcessor.java
Date Sat, 19 Nov 2011 17:55:19 GMT
Author: markrmiller
Date: Sat Nov 19 17:55:19 2011
New Revision: 1204029

URL: http://svn.apache.org/viewvc?rev=1204029&view=rev
Log:
replace the hard coded hokey hash->shard mapping with hokey working code

Added:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/HashPartitioner.java
  (with props)
Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/HashPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/HashPartitioner.java?rev=1204029&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/HashPartitioner.java
(added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/HashPartitioner.java
Sat Nov 19 17:55:19 2011
@@ -0,0 +1,61 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class HashPartitioner {
+  
+  public static class Range {
+    public long min;
+    public long max;
+    
+    public Range(long min, long max) {
+      this.min = min;
+      this.max = max;
+    }
+  }
+  
+  public List<Range> partitionRange(int partitions) {
+    // some hokey code to partition the int space
+    long range = Integer.MAX_VALUE + (Math.abs((long)Integer.MIN_VALUE));
+    long srange = range / partitions;
+    
+    System.out.println("min:" + Integer.MIN_VALUE);
+    System.out.println("max:" + Integer.MAX_VALUE);
+    
+    System.out.println("range:" + range);
+    System.out.println("srange:" + srange);
+    
+    List<Range> ranges = new ArrayList<Range>(partitions);
+    
+    long end = 0;
+    long start = Integer.MIN_VALUE;
+
+    while (end < Integer.MAX_VALUE) {
+      end = start + srange;
+      System.out.println("from:" + start + ":" + end);
+      start = end + 1L;
+      ranges.add(new Range(start, end));
+    }
+    
+    return ranges;
+  }
+}

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1204029&r1=1204028&r2=1204029&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
Sat Nov 19 17:55:19 2011
@@ -18,13 +18,18 @@ package org.apache.solr.update.processor
  */
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.lang.NullArgumentException;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRef;
+import org.apache.solr.cloud.HashPartitioner;
+import org.apache.solr.cloud.HashPartitioner.Range;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputField;
 import org.apache.solr.common.cloud.CloudState;
@@ -76,6 +81,8 @@ public class DistributedUpdateProcessor 
   private final SchemaField idField;
   
   private final SolrCmdDistributor cmdDistrib;
+
+  private HashPartitioner hp;
   
   public DistributedUpdateProcessor(SolrQueryRequest req,
       SolrQueryResponse rsp, UpdateRequestProcessor next) {
@@ -103,18 +110,18 @@ public class DistributedUpdateProcessor 
     System.out.println("hash:" + hash);
     CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
     
-    String shardId = getShard(hash); // get the right shard based on the hash...
+    CloudState cloudState = req.getCore().getCoreDescriptor().getCoreContainer().getZkController().getCloudState();
+    
+    String collection = coreDesc.getCloudDescriptor().getCollectionName();
+    String shardId = getShard(hash, collection, cloudState); // get the right shard based
on the hash...
 
     // if we are in zk mode...
     if (coreDesc.getCoreContainer().getZkController() != null) {
       // the leader is...
       // TODO: if there is no leader, wait and look again
       // TODO: we are reading the leader from zk every time - we should cache
-      // this and watch for changes??
-     
-      String collection = coreDesc.getCloudDescriptor().getCollectionName();
+      // this and watch for changes?? Just pull it from ZkController cluster state probably?
 
-      
       ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
       
       String leaderNode = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
@@ -178,15 +185,25 @@ public class DistributedUpdateProcessor 
     }
   }
   
-  private String getShard(int hash) {
-    if (hash < -715827884) {
-      return "shard1";
-    } else if (hash < 715827881) {
-      return "shard2";
-    } else {
-      return "shard3";
+  private String getShard(int hash, String collection, CloudState cloudState) {
+    // nocommit: we certainly don't want to do this every update request...
+    // get the shard names
+    Map<String,Slice> slices = cloudState.getSlices(collection);
+    Set<String> shards = slices.keySet();
+    List<String> shardList = new ArrayList<String>();
+    shardList.addAll(shards);
+    Collections.sort(shardList);
+    hp = new HashPartitioner();
+    List<Range> ranges = hp.partitionRange(shards.size());
+    int cnt = 0;
+    for (Range range : ranges) {
+      if (hash < range.max) {
+        return shardList.get(cnt);
+      }
+      cnt++;
     }
-      
+    
+    throw new IllegalStateException("The HashPartitioner failed");
   }
 
   @Override



Mime
View raw message