lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cpoersc...@apache.org
Subject [29/50] [abbrv] lucene-solr:jira/solr-9045: SOLR-10254: significantTerms Streaming Expression should work in non-SolrCloud mode
Date Mon, 13 Mar 2017 15:40:24 GMT
SOLR-10254: significantTerms Streaming Expression should work in non-SolrCloud mode


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/682c6a7d
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/682c6a7d
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/682c6a7d

Branch: refs/heads/jira/solr-9045
Commit: 682c6a7d5145129e8ae01ff00505ddf5a564d396
Parents: 8756be0
Author: Joel Bernstein <jbernste@apache.org>
Authored: Wed Mar 8 21:10:56 2017 -0500
Committer: Joel Bernstein <jbernste@apache.org>
Committed: Wed Mar 8 21:11:26 2017 -0500

----------------------------------------------------------------------
 .../org/apache/solr/handler/StreamHandler.java  |  27 +++
 .../solrj/io/stream/SignificantTermsStream.java |  49 +---
 .../client/solrj/io/stream/TupleStream.java     |  94 ++++++++
 .../solrj/io/stream/StreamExpressionTest.java   | 234 +++++++++++++------
 4 files changed, 285 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/682c6a7d/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index 31b37e7..06e59b6 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -18,6 +18,7 @@ package org.apache.solr.handler;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -246,6 +247,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
     int worker = params.getInt("workerID", 0);
     int numWorkers = params.getInt("numWorkers", 1);
     StreamContext context = new StreamContext();
+    context.put("shards", getCollectionShards(params));
     context.workerID = worker;
     context.numWorkers = numWorkers;
     context.setSolrClientCache(clientCache);
@@ -509,4 +511,29 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
       return tuple;
     }
   }
+
+  private Map<String, List<String>> getCollectionShards(SolrParams params) {
+
+    Map<String, List<String>> collectionShards = new HashMap();
+    Iterator<String> paramsIt = params.getParameterNamesIterator();
+    while(paramsIt.hasNext()) {
+      String param = paramsIt.next();
+      if(param.indexOf(".shards") > -1) {
+        String collection = param.split("\\.")[0];
+        String shardString = params.get(param);
+        String[] shards = shardString.split(",");
+        List<String> shardList = new ArrayList();
+        for(String shard : shards) {
+          shardList.add(shard);
+        }
+        collectionShards.put(collection, shardList);
+      }
+    }
+
+    if(collectionShards.size() > 0) {
+      return collectionShards;
+    } else {
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/682c6a7d/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
index 87b5a9f..2acee51 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
@@ -74,12 +74,9 @@ public class SignificantTermsStream extends TupleStream implements Expressible{
 
   protected transient SolrClientCache cache;
   protected transient boolean isCloseCache;
-  protected transient CloudSolrClient cloudSolrClient;
-
   protected transient StreamContext streamContext;
   protected ExecutorService executorService;
 
-
   public SignificantTermsStream(String zkHost,
                                  String collectionName,
                                  Map params,
@@ -168,12 +165,12 @@ public class SignificantTermsStream extends TupleStream implements Expressible{
     String zkHost = null;
     if(null == zkHostExpression){
       zkHost = factory.getCollectionZkHost(collectionName);
-    }
-    else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+    } else if(zkHostExpression.getParameter() instanceof StreamExpressionValue) {
       zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
     }
-    if(null == zkHost){
-      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not
found for collection '%s'",expression,collectionName));
+
+    if(zkHost == null){
+      zkHost = factory.getDefaultZkHost();
     }
 
     // We've got all the required items
@@ -238,47 +235,13 @@ public class SignificantTermsStream extends TupleStream implements Expressible{
       isCloseCache = false;
     }
 
-    this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
-    this.executorService = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("FeaturesSelectionStream"));
+    this.executorService = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("SignificantTermsStream"));
   }
 
   public List<TupleStream> children() {
     return null;
   }
 
-  private List<String> getShardUrls() throws IOException {
-    try {
-      ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
-
-      Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader,
false);
-
-      ClusterState clusterState = zkStateReader.getClusterState();
-      Set<String> liveNodes = clusterState.getLiveNodes();
-
-      List<String> baseUrls = new ArrayList<>();
-      for(Slice slice : slices) {
-        Collection<Replica> replicas = slice.getReplicas();
-        List<Replica> shuffler = new ArrayList<>();
-        for(Replica replica : replicas) {
-          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
{
-            shuffler.add(replica);
-          }
-        }
-
-        Collections.shuffle(shuffler, new Random());
-        Replica rep = shuffler.get(0);
-        ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
-        String url = zkProps.getCoreUrl();
-        baseUrls.add(url);
-      }
-
-      return baseUrls;
-
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
   private List<Future<NamedList>> callShards(List<String> baseUrls) throws
IOException {
 
     List<Future<NamedList>> futures = new ArrayList<>();
@@ -326,7 +289,7 @@ public class SignificantTermsStream extends TupleStream implements Expressible{
         Map<String, int[]> mergeFreqs = new HashMap<>();
         long numDocs = 0;
         long resultCount = 0;
-        for (Future<NamedList> getTopTermsCall : callShards(getShardUrls())) {
+        for (Future<NamedList> getTopTermsCall : callShards(getShards(zkHost, collection,
streamContext))) {
           NamedList resp = getTopTermsCall.get();
 
           List<String> terms = (List<String>)resp.get("sterms");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/682c6a7d/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
index 49a806f..ceea6af 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
@@ -19,9 +19,16 @@ package org.apache.solr.client.solrj.io.stream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
+import java.util.Map;
 
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@@ -29,6 +36,14 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.common.IteratorWriter;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.StrUtils;
 
 
 public abstract class TupleStream implements Closeable, Serializable, MapWriter {
@@ -84,4 +99,83 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
   public UUID getStreamNodeId(){
     return streamNodeId;
   }
+
+  public static List<String> getShards(String zkHost,
+                                       String collection,
+                                       StreamContext streamContext)
+      throws IOException {
+    Map<String, List<String>> shardsMap = null;
+    List<String> shards = new ArrayList();
+
+    if(streamContext != null) {
+      shardsMap = (Map<String, List<String>>)streamContext.get("shards");
+    }
+
+    if(shardsMap != null) {
+      //Manual Sharding
+      shards = shardsMap.get(collection);
+    } else {
+      //SolrCloud Sharding
+      CloudSolrClient cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
+      ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+      ClusterState clusterState = zkStateReader.getClusterState();
+      Collection<Slice> slices = getSlices(collection, zkStateReader, true);
+      Set<String> liveNodes = clusterState.getLiveNodes();
+      for(Slice slice : slices) {
+        Collection<Replica> replicas = slice.getReplicas();
+        List<Replica> shuffler = new ArrayList<>();
+        for(Replica replica : replicas) {
+          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
+            shuffler.add(replica);
+        }
+
+        Collections.shuffle(shuffler, new Random());
+        Replica rep = shuffler.get(0);
+        ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
+        String url = zkProps.getCoreUrl();
+        shards.add(url);
+      }
+    }
+
+    return shards;
+  }
+
+  public static Collection<Slice> getSlices(String collectionName,
+                                            ZkStateReader zkStateReader,
+                                            boolean checkAlias) throws IOException {
+    ClusterState clusterState = zkStateReader.getClusterState();
+
+    Map<String, DocCollection> collectionsMap = clusterState.getCollectionsMap();
+
+    // Check collection case sensitive
+    if(collectionsMap.containsKey(collectionName)) {
+      return collectionsMap.get(collectionName).getActiveSlices();
+    }
+
+    // Check collection case insensitive
+    for(String collectionMapKey : collectionsMap.keySet()) {
+      if(collectionMapKey.equalsIgnoreCase(collectionName)) {
+        return collectionsMap.get(collectionMapKey).getActiveSlices();
+      }
+    }
+
+    if(checkAlias) {
+      // check for collection alias
+      Aliases aliases = zkStateReader.getAliases();
+      String alias = aliases.getCollectionAlias(collectionName);
+      if (alias != null) {
+        Collection<Slice> slices = new ArrayList<>();
+
+        List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
+        for (String aliasCollectionName : aliasList) {
+          // Add all active slices for this alias collection
+          slices.addAll(collectionsMap.get(aliasCollectionName).getActiveSlices());
+        }
+
+        return slices;
+      }
+    }
+
+    throw new IOException("Slices not found for " + collectionName);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/682c6a7d/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 30b7056..c61e443 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -335,7 +335,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     tuples = getTuples(stream);
     
     assert(tuples.size() == 4);
-    assertOrder(tuples, 4,3,1,2);
+    assertOrder(tuples, 4, 3, 1, 2);
     
     // Basic w/multi comp
     expression = StreamExpressionParser.parse("unique(search(" + COLLECTIONORALIAS + ", q=*:*,
fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")");
@@ -1577,7 +1577,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
 
     List<Tuple> tuples = getTuples(pstream);
     assert(tuples.size() == 5);
-    assertOrder(tuples, 0,1,3,4,6);
+    assertOrder(tuples, 0, 1, 3, 4, 6);
 
     //Test the eofTuples
 
@@ -4712,8 +4712,6 @@ public class StreamExpressionTest extends SolrCloudTestCase {
   @Test
   public void testSignificantTermsStream() throws Exception {
 
-    Assume.assumeTrue(!useAlias);
-
     UpdateRequest updateRequest = new UpdateRequest();
     for (int i = 0; i < 5000; i++) {
       updateRequest.add(id, "a"+i, "test_t", "a b c d m l");
@@ -4742,106 +4740,186 @@ public class StreamExpressionTest extends SolrCloudTestCase {
 
     StreamFactory factory = new StreamFactory()
         .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
+        .withDefaultZkHost(cluster.getZkServer().getZkAddress())
         .withFunctionName("significantTerms", SignificantTermsStream.class);
 
-    String significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\",
limit=3, minTermLength=1, maxDocFreq=\".5\")";
-    stream = factory.constructStream(significantTerms);
-    tuples = getTuples(stream);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache cache = new SolrClientCache();
+    streamContext.setSolrClientCache(cache);
+    try {
 
-    assert(tuples.size() == 3);
-    assertTrue(tuples.get(0).get("term").equals("l"));
-    assertTrue(tuples.get(0).getLong("background") == 5000);
-    assertTrue(tuples.get(0).getLong("foreground") == 5000);
+      String significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\",
limit=3, minTermLength=1, maxDocFreq=\".5\")";
+      stream = factory.constructStream(significantTerms);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
+      assert (tuples.size() == 3);
+      assertTrue(tuples.get(0).get("term").equals("l"));
+      assertTrue(tuples.get(0).getLong("background") == 5000);
+      assertTrue(tuples.get(0).getLong("foreground") == 5000);
 
-    assertTrue(tuples.get(1).get("term").equals("m"));
-    assertTrue(tuples.get(1).getLong("background") == 5500);
-    assertTrue(tuples.get(1).getLong("foreground") == 5000);
 
-    assertTrue(tuples.get(2).get("term").equals("d"));
-    assertTrue(tuples.get(2).getLong("background") == 5600);
-    assertTrue(tuples.get(2).getLong("foreground") == 5000);
+      assertTrue(tuples.get(1).get("term").equals("m"));
+      assertTrue(tuples.get(1).getLong("background") == 5500);
+      assertTrue(tuples.get(1).getLong("foreground") == 5000);
 
-    //Test maxDocFreq
-    significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\", limit=3,
maxDocFreq=2650, minTermLength=1)";
-    stream = factory.constructStream(significantTerms);
-    tuples = getTuples(stream);
+      assertTrue(tuples.get(2).get("term").equals("d"));
+      assertTrue(tuples.get(2).getLong("background") == 5600);
+      assertTrue(tuples.get(2).getLong("foreground") == 5000);
 
-    assert(tuples.size() == 1);
-    assertTrue(tuples.get(0).get("term").equals("l"));
-    assertTrue(tuples.get(0).getLong("background") == 5000);
-    assertTrue(tuples.get(0).getLong("foreground") == 5000);
+      //Test maxDocFreq
+      significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\", limit=3,
maxDocFreq=2650, minTermLength=1)";
+      stream = factory.constructStream(significantTerms);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    //Test maxDocFreq percentage
+      assert (tuples.size() == 1);
+      assertTrue(tuples.get(0).get("term").equals("l"));
+      assertTrue(tuples.get(0).getLong("background") == 5000);
+      assertTrue(tuples.get(0).getLong("foreground") == 5000);
 
-    significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\", limit=3,
maxDocFreq=\".45\", minTermLength=1)";
-    stream = factory.constructStream(significantTerms);
-    tuples = getTuples(stream);
-    assert(tuples.size() == 1);
-    assertTrue(tuples.get(0).get("term").equals("l"));
-    assertTrue(tuples.get(0).getLong("background") == 5000);
-    assertTrue(tuples.get(0).getLong("foreground") == 5000);
+      //Test maxDocFreq percentage
 
+      significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\", limit=3,
maxDocFreq=\".45\", minTermLength=1)";
+      stream = factory.constructStream(significantTerms);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 1);
+      assertTrue(tuples.get(0).get("term").equals("l"));
+      assertTrue(tuples.get(0).getLong("background") == 5000);
+      assertTrue(tuples.get(0).getLong("foreground") == 5000);
 
-    //Test min doc freq
-    significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\", limit=3,
minDocFreq=\"2700\", minTermLength=1, maxDocFreq=\".5\")";
-    stream = factory.constructStream(significantTerms);
-    tuples = getTuples(stream);
 
-    assert(tuples.size() == 3);
+      //Test min doc freq
+      significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\", limit=3,
minDocFreq=\"2700\", minTermLength=1, maxDocFreq=\".5\")";
+      stream = factory.constructStream(significantTerms);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    assertTrue(tuples.get(0).get("term").equals("m"));
-    assertTrue(tuples.get(0).getLong("background") == 5500);
-    assertTrue(tuples.get(0).getLong("foreground") == 5000);
+      assert (tuples.size() == 3);
 
-    assertTrue(tuples.get(1).get("term").equals("d"));
-    assertTrue(tuples.get(1).getLong("background") == 5600);
-    assertTrue(tuples.get(1).getLong("foreground") == 5000);
+      assertTrue(tuples.get(0).get("term").equals("m"));
+      assertTrue(tuples.get(0).getLong("background") == 5500);
+      assertTrue(tuples.get(0).getLong("foreground") == 5000);
 
-    assertTrue(tuples.get(2).get("term").equals("c"));
-    assertTrue(tuples.get(2).getLong("background") == 5900);
-    assertTrue(tuples.get(2).getLong("foreground") == 5000);
+      assertTrue(tuples.get(1).get("term").equals("d"));
+      assertTrue(tuples.get(1).getLong("background") == 5600);
+      assertTrue(tuples.get(1).getLong("foreground") == 5000);
 
+      assertTrue(tuples.get(2).get("term").equals("c"));
+      assertTrue(tuples.get(2).getLong("background") == 5900);
+      assertTrue(tuples.get(2).getLong("foreground") == 5000);
 
-    //Test min doc freq percent
-    significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\", limit=3,
minDocFreq=\".478\", minTermLength=1, maxDocFreq=\".5\")";
-    stream = factory.constructStream(significantTerms);
-    tuples = getTuples(stream);
 
-    assert(tuples.size() == 1);
+      //Test min doc freq percent
+      significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\", limit=3,
minDocFreq=\".478\", minTermLength=1, maxDocFreq=\".5\")";
+      stream = factory.constructStream(significantTerms);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    assertTrue(tuples.get(0).get("term").equals("c"));
-    assertTrue(tuples.get(0).getLong("background") == 5900);
-    assertTrue(tuples.get(0).getLong("foreground") == 5000);
+      assert (tuples.size() == 1);
 
+      assertTrue(tuples.get(0).get("term").equals("c"));
+      assertTrue(tuples.get(0).getLong("background") == 5900);
+      assertTrue(tuples.get(0).getLong("foreground") == 5000);
 
-    //Test limit
+      //Test limit
 
-    significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\", limit=2,
minDocFreq=\"2700\", minTermLength=1, maxDocFreq=\".5\")";
-    stream = factory.constructStream(significantTerms);
-    tuples = getTuples(stream);
+      significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\", limit=2,
minDocFreq=\"2700\", minTermLength=1, maxDocFreq=\".5\")";
+      stream = factory.constructStream(significantTerms);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
 
-    assert(tuples.size() == 2);
+      assert (tuples.size() == 2);
 
-    assertTrue(tuples.get(0).get("term").equals("m"));
-    assertTrue(tuples.get(0).getLong("background") == 5500);
-    assertTrue(tuples.get(0).getLong("foreground") == 5000);
+      assertTrue(tuples.get(0).get("term").equals("m"));
+      assertTrue(tuples.get(0).getLong("background") == 5500);
+      assertTrue(tuples.get(0).getLong("foreground") == 5000);
 
-    assertTrue(tuples.get(1).get("term").equals("d"));
-    assertTrue(tuples.get(1).getLong("background") == 5600);
-    assertTrue(tuples.get(1).getLong("foreground") == 5000);
+      assertTrue(tuples.get(1).get("term").equals("d"));
+      assertTrue(tuples.get(1).getLong("background") == 5600);
+      assertTrue(tuples.get(1).getLong("foreground") == 5000);
 
-    //Test term length
+      //Test term length
 
-    significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\", limit=2,
minDocFreq=\"2700\", minTermLength=2)";
-    stream = factory.constructStream(significantTerms);
-    tuples = getTuples(stream);
-    assert(tuples.size() == 0);
+      significantTerms = "significantTerms(collection1, q=\"id:a*\",  field=\"test_t\", limit=2,
minDocFreq=\"2700\", minTermLength=2)";
+      stream = factory.constructStream(significantTerms);
+      stream.setStreamContext(streamContext);
+      tuples = getTuples(stream);
+      assert (tuples.size() == 0);
 
-  }
+
+      //Test with shards parameter
+      List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(),
COLLECTIONORALIAS, streamContext);
+
+      Map<String, List<String>> shardsMap = new HashMap();
+      shardsMap.put("myCollection", shardUrls);
+      StreamContext context = new StreamContext();
+      context.put("shards", shardsMap);
+      context.setSolrClientCache(cache);
+      significantTerms = "significantTerms(myCollection, q=\"id:a*\",  field=\"test_t\",
limit=2, minDocFreq=\"2700\", minTermLength=1, maxDocFreq=\".5\")";
+      stream = factory.constructStream(significantTerms);
+      stream.setStreamContext(context);
+      tuples = getTuples(stream);
+
+      assert (tuples.size() == 2);
+
+      assertTrue(tuples.get(0).get("term").equals("m"));
+      assertTrue(tuples.get(0).getLong("background") == 5500);
+      assertTrue(tuples.get(0).getLong("foreground") == 5000);
+
+      assertTrue(tuples.get(1).get("term").equals("d"));
+      assertTrue(tuples.get(1).getLong("background") == 5600);
+      assertTrue(tuples.get(1).getLong("foreground") == 5000);
+
+      //Execersise the /stream hander
+
+      //Add the shards http parameter for the myCollection
+      StringBuilder buf = new StringBuilder();
+      for (String shardUrl : shardUrls) {
+        if (buf.length() > 0) {
+          buf.append(",");
+        }
+        buf.append(shardUrl);
+      }
+
+      ModifiableSolrParams solrParams = new ModifiableSolrParams();
+      solrParams.add("qt", "/stream");
+      solrParams.add("expr", significantTerms);
+      solrParams.add("myCollection.shards", buf.toString());
+      SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
+      tuples = getTuples(solrStream);
+      assert (tuples.size() == 2);
+
+      assertTrue(tuples.get(0).get("term").equals("m"));
+      assertTrue(tuples.get(0).getLong("background") == 5500);
+      assertTrue(tuples.get(0).getLong("foreground") == 5000);
+
+      assertTrue(tuples.get(1).get("term").equals("d"));
+      assertTrue(tuples.get(1).getLong("background") == 5600);
+      assertTrue(tuples.get(1).getLong("foreground") == 5000);
+
+      //Add a negative test to prove that it cannot find slices if shards parameter is removed
+
+      try {
+        ModifiableSolrParams solrParamsBad = new ModifiableSolrParams();
+        solrParamsBad.add("qt", "/stream");
+        solrParamsBad.add("expr", significantTerms);
+        solrStream = new SolrStream(shardUrls.get(0), solrParamsBad);
+        tuples = getTuples(solrStream);
+        throw new Exception("Exception should have been thrown above");
+      } catch (IOException e) {
+        assertTrue(e.getMessage().contains("Slices not found for myCollection"));
+      }
+    } finally {
+      cache.close();
+    }
 
 
 
+
+  }
+
   @Test
   public void testComplementStream() throws Exception {
 
@@ -4920,12 +4998,16 @@ public class StreamExpressionTest extends SolrCloudTestCase {
   }
 
   protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
-    tupleStream.open();
     List<Tuple> tuples = new ArrayList<Tuple>();
-    for(Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) {
-      tuples.add(t);
+
+    try {
+      tupleStream.open();
+      for (Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) {
+        tuples.add(t);
+      }
+    } finally {
+      tupleStream.close();
     }
-    tupleStream.close();
     return tuples;
   }
   protected boolean assertOrder(List<Tuple> tuples, int... ids) throws Exception {


Mime
View raw message