lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From er...@apache.org
Subject [1/2] lucene-solr:branch_6x: SOLR-8467: CloudSolrStream and FacetStream should take a SolrParams object rather than a Map<String, String> to allow more complex Solr queries to be specified
Date Mon, 09 May 2016 20:37:27 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x 75a84b7d2 -> 73b4defc0


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b4defc/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index b1659c9..dd02175 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -36,7 +36,6 @@ import java.util.stream.Collectors;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
-import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -56,6 +55,9 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.MapSolrParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.SolrjNamedThreadFactory;
 
@@ -72,16 +74,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
 
   protected String zkHost;
   protected String collection;
-  protected Map<String,String> params;
+  protected SolrParams params;
   private Map<String, String> fieldMappings;
   protected StreamComparator comp;
-  private int zkConnectTimeout = 10000;
-  private int zkClientTimeout = 10000;
-  private int numWorkers;
-  private int workerID;
   private boolean trace;
   protected transient Map<String, Tuple> eofTuples;
-  protected transient SolrClientCache cache;
   protected transient CloudSolrClient cloudSolrClient;
   protected transient List<TupleStream> solrStreams;
   protected transient TreeSet<TupleWrapper> tuples;
@@ -91,7 +88,34 @@ public class CloudSolrStream extends TupleStream implements Expressible {
   protected CloudSolrStream(){
     
   }
+
+
+  /**
+   * @param zkHost         Zookeeper ensemble connection string
+   * @param collectionName Name of the collection to operate on
+   * @param params         Map&lt;String, String&gt; of parameter/value pairs
+   * @throws IOException Something went wrong
+   *                     <p>
+   *                     This form does not allow specifying multiple clauses, say "fq" clauses, use the form that
+   *                     takes a SolrParams. Transition code can call the preferred method that takes SolrParams
+   *                     by calling CloudSolrStream(zkHost, collectionName,
+   *                     new ModifiableSolrParams(SolrParams.toMultiMap(new NamedList(Map&lt;String, String&gt;)));
+   * @deprecated         Use the constructor that has a SolrParams obj rather than a Map
+   */
+
+  @Deprecated
   public CloudSolrStream(String zkHost, String collectionName, Map params) throws IOException {
+    init(collectionName, zkHost, new MapSolrParams(params));
+  }
+
+  /**
+   * @param zkHost         Zookeeper ensemble connection string
+   * @param collectionName Name of the collection to operate on
+   * @param params         Map&lt;String, String[]&gt; of parameter/value pairs
+   * @throws IOException Something went wrong
+   */
+
+  public CloudSolrStream(String zkHost, String collectionName, SolrParams params) throws IOException {
     init(collectionName, zkHost, params);
   }
 
@@ -117,16 +141,16 @@ public class CloudSolrStream extends TupleStream implements Expressible {
       throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
     }
     
-    Map<String,String> params = new HashMap<String,String>();
+    ModifiableSolrParams mParams = new ModifiableSolrParams();
     for(StreamExpressionNamedParameter namedParam : namedParams){
       if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")){
-        params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
+        mParams.add(namedParam.getName(), namedParam.getParameter().toString().trim());
       }
     }
 
     // Aliases, optional, if provided then need to split
     if(null != aliasExpression && aliasExpression.getParameter() instanceof StreamExpressionValue){
-      fieldMappings = new HashMap<String,String>();
+      fieldMappings = new HashMap<>();
       for(String mapping : ((StreamExpressionValue)aliasExpression.getParameter()).getValue().split(",")){
         String[] parts = mapping.trim().split("=");
         if(2 == parts.length){
@@ -154,7 +178,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     }
     
     // We've got all the required items
-    init(collectionName, zkHost, params);
+    init(collectionName, zkHost, mParams);
   }
   
   @Override
@@ -168,14 +192,16 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     expression.addParameter(collection);
     
     // parameters
-    for(Entry<String,String> param : params.entrySet()){
-      String value = param.getValue();
+
+    ModifiableSolrParams mParams = new ModifiableSolrParams(SolrParams.toMultiMap(params.toNamedList()));
+    for (Entry<String, String[]> param : mParams.getMap().entrySet()) {
+      String value = String.join(",", param.getValue());
       
       // SOLR-8409: This is a special case where the params contain a " character
       // Do note that in any other BASE streams with parameters where a " might come into play
       // that this same replacement needs to take place.
       value = value.replace("\"", "\\\"");
-      
+
       expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), value));
     }
     
@@ -213,29 +239,34 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
     child.setImplementingClass("Solr/Lucene");
     child.setExpressionType(ExpressionType.DATASTORE);
+    
     if(null != params){
-      child.setExpression(params.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
+      ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+      child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
     }
     explanation.addChild(child);
     
     return explanation;
   }
-  
-  private void init(String collectionName, String zkHost, Map params) throws IOException {
+
+  private void init(String collectionName, String zkHost, SolrParams params) throws IOException {
     this.zkHost = zkHost;
     this.collection = collectionName;
-    this.params = params;
+    this.params = new ModifiableSolrParams(params);
 
     // If the comparator is null then it was not explicitly set so we will create one using the sort parameter
     // of the query. While doing this we will also take into account any aliases such that if we are sorting on
     // fieldA but fieldA is aliased to alias.fieldA then the comparater will be against alias.fieldA.
-    if(!params.containsKey("fl")){
+    String fls = String.join(",", params.getParams("fl"));
+    if (fls == null) {
       throw new IOException("fl param expected for a stream");
     }
-    if(!params.containsKey("sort")){
+
+    String sorts = String.join(",", params.getParams("sort"));
+    if (sorts == null) {
       throw new IOException("sort param expected for a stream");
     }
-    this.comp = parseComp((String)params.get("sort"), (String)params.get("fl")); 
+    this.comp = parseComp(sorts, fls);
   }
   
   public void setFieldMappings(Map<String, String> fieldMappings) {
@@ -247,9 +278,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
   }
 
   public void setStreamContext(StreamContext context) {
-    this.numWorkers = context.numWorkers;
-    this.workerID = context.workerID;
-    this.cache = context.getSolrClientCache();
     this.streamContext = context;
   }
 
@@ -261,8 +289,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     this.tuples = new TreeSet();
     this.solrStreams = new ArrayList();
     this.eofTuples = Collections.synchronizedMap(new HashMap());
-    if(this.cache != null) {
-      this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
+    if (this.streamContext != null && this.streamContext.getSolrClientCache() != null) {
+      this.cloudSolrClient = this.streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
     } else {
       this.cloudSolrClient = new Builder()
           .withZkHost(zkHost)
@@ -345,7 +373,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
         }
       }
 
-      params.put("distrib","false"); // We are the aggregator.
+      ModifiableSolrParams mParams = new ModifiableSolrParams(params); 
+      mParams.set("distrib", "false"); // We are the aggregator.
 
       for(Slice slice : slices) {
         Collection<Replica> replicas = slice.getReplicas();
@@ -359,7 +388,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
         Replica rep = shuffler.get(0);
         ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
         String url = zkProps.getCoreUrl();
-        SolrStream solrStream = new SolrStream(url, params);
+        SolrStream solrStream = new SolrStream(url, mParams);
         if(streamContext != null) {
           solrStream.setStreamContext(streamContext);
         }
@@ -406,7 +435,9 @@ public class CloudSolrStream extends TupleStream implements Expressible {
       }
     }
 
-    if(cache == null && cloudSolrClient != null) {
+    if ((this.streamContext == null || this.streamContext.getSolrClientCache() == null) &&
+        cloudSolrClient != null) {
+
       cloudSolrClient.close();
     }
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b4defc/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index ceaf13c..ae04a85 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -46,7 +46,9 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
 import org.apache.solr.client.solrj.io.stream.metrics.Metric;
 import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 
 /**
@@ -65,11 +67,16 @@ public class FacetStream extends TupleStream implements Expressible  {
   private List<Tuple> tuples = new ArrayList<Tuple>();
   private int index;
   private String zkHost;
-  private Map<String, String> props;
+  private SolrParams params;
   private String collection;
   protected transient SolrClientCache cache;
   protected transient CloudSolrClient cloudSolrClient;
 
+  /*
+   *
+   * @deprecated. Use the form that takes a SolrParams rather than Map&ltString, String&gt;
+   */
+  @Deprecated
   public FacetStream(String zkHost,
                      String collection,
                      Map<String, String> props,
@@ -77,7 +84,17 @@ public class FacetStream extends TupleStream implements Expressible  {
                      Metric[] metrics,
                      FieldComparator[] bucketSorts,
                      int bucketSizeLimit) throws IOException {
-    init(collection, props, buckets, bucketSorts, metrics, bucketSizeLimit, zkHost);
+    init(collection, new MapSolrParams(props), buckets, bucketSorts, metrics, bucketSizeLimit, zkHost);
+  }
+
+  public FacetStream(String zkHost,
+                     String collection,
+                     SolrParams params,
+                     Bucket[] buckets,
+                     Metric[] metrics,
+                     FieldComparator[] bucketSorts,
+                     int bucketSizeLimit) throws IOException {
+    init(collection, params, buckets, bucketSorts, metrics, bucketSizeLimit, zkHost);
   }
   
   public FacetStream(StreamExpression expression, StreamFactory factory) throws IOException{   
@@ -106,10 +123,10 @@ public class FacetStream extends TupleStream implements Expressible  {
     }
     
     // pull out known named params
-    Map<String,String> params = new HashMap<String,String>();
+    ModifiableSolrParams params = new ModifiableSolrParams();
     for(StreamExpressionNamedParameter namedParam : namedParams){
       if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("buckets") && !namedParam.getName().equals("bucketSorts") && !namedParam.getName().equals("limit")){
-        params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
+        params.add(namedParam.getName(), namedParam.getParameter().toString().trim());
       }
     }
 
@@ -204,10 +221,10 @@ public class FacetStream extends TupleStream implements Expressible  {
 
     return comps;
   }
-  
-  private void init(String collection, Map<String, String> props, Bucket[] buckets, FieldComparator[] bucketSorts, Metric[] metrics, int bucketSizeLimit, String zkHost) throws IOException {
+
+  private void init(String collection, SolrParams params, Bucket[] buckets, FieldComparator[] bucketSorts, Metric[] metrics, int bucketSizeLimit, String zkHost) throws IOException {
     this.zkHost  = zkHost;
-    this.props   = props;
+    this.params = params;
     this.buckets = buckets;
     this.metrics = metrics;
     this.bucketSizeLimit   = bucketSizeLimit;
@@ -233,8 +250,11 @@ public class FacetStream extends TupleStream implements Expressible  {
     expression.addParameter(collection);
     
     // parameters
-    for(Entry<String,String> param : props.entrySet()){
-      expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue()));
+    ModifiableSolrParams tmpParams = new ModifiableSolrParams(params);
+
+    for (Entry<String, String[]> param : tmpParams.getMap().entrySet()) {
+      expression.addParameter(new StreamExpressionNamedParameter(param.getKey(),
+          String.join(",", param.getValue())));
     }
     
     // buckets
@@ -288,8 +308,10 @@ public class FacetStream extends TupleStream implements Expressible  {
     // parallel stream.
     
     child.setImplementingClass("Solr/Lucene");
-    child.setExpressionType(ExpressionType.DATASTORE);    
-    child.setExpression(props.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
+    child.setExpressionType(ExpressionType.DATASTORE);
+    ModifiableSolrParams tmpParams = new ModifiableSolrParams(SolrParams.toMultiMap(params.toNamedList()));
+
+    child.setExpression(tmpParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
     
     explanation.addChild(child);
     
@@ -301,8 +323,7 @@ public class FacetStream extends TupleStream implements Expressible  {
   }
 
   public List<TupleStream> children() {
-    List<TupleStream> l =  new ArrayList();
-    return l;
+    return new ArrayList();
   }
 
   public void open() throws IOException {
@@ -317,11 +338,11 @@ public class FacetStream extends TupleStream implements Expressible  {
     FieldComparator[] adjustedSorts = adjustSorts(buckets, bucketSorts);
     String json = getJsonFacetString(buckets, metrics, adjustedSorts, bucketSizeLimit);
 
-    ModifiableSolrParams params = getParams(this.props);
-    params.add("json.facet", json);
-    params.add("rows", "0");
+    ModifiableSolrParams paramsLoc = new ModifiableSolrParams(params);
+    paramsLoc.set("json.facet", json);
+    paramsLoc.set("rows", "0");
 
-    QueryRequest request = new QueryRequest(params);
+    QueryRequest request = new QueryRequest(paramsLoc);
     try {
       NamedList response = cloudSolrClient.request(request, collection);
       getTuples(response, buckets, metrics);
@@ -350,15 +371,6 @@ public class FacetStream extends TupleStream implements Expressible  {
     }
   }
 
-  private ModifiableSolrParams getParams(Map<String, String> props) {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    for(String key : props.keySet()) {
-      String value = props.get(key);
-      params.add(key, value);
-    }
-    return params;
-  }
-
   private String getJsonFacetString(Bucket[] _buckets, Metric[] _metrics, FieldComparator[] _sorts, int _limit) {
     StringBuilder buf = new StringBuilder();
     appendJson(buf, _buckets, _metrics, _sorts, _limit, 0);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b4defc/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index 9570643..779cc31 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -16,21 +16,15 @@
  */
 package org.apache.solr.client.solrj.io.stream;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Set;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
 import java.util.Random;
 
 import org.apache.solr.client.solrj.io.Tuple;
@@ -49,7 +43,7 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Base64;
+import org.apache.solr.common.params.ModifiableSolrParams;
 
 /**
  * The ParallelStream decorates a TupleStream implementation and pushes it to N workers for parallel execution.
@@ -287,16 +281,17 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
       Collections.shuffle(shuffler, new Random());
 
       for(int w=0; w<workers; w++) {
-        HashMap params = new HashMap();
-        params.put("distrib","false"); // We are the aggregator.
-        params.put("numWorkers", workers);
-        params.put("workerID", w);
-        params.put("expr", pushStream);
-        params.put("qt","/stream");
+        ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
+        paramsLoc.set("distrib","false"); // We are the aggregator.
+        paramsLoc.set("numWorkers", workers);
+        paramsLoc.set("workerID", w);
+
+        paramsLoc.set("expr", pushStream.toString());
+        paramsLoc.set("qt","/stream");
         Replica rep = shuffler.get(w);
         ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
         String url = zkProps.getCoreUrl();
-        SolrStream solrStream = new SolrStream(url, params);
+        SolrStream solrStream = new SolrStream(url, paramsLoc);
         solrStreams.add(solrStream);
       }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b4defc/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index f4aabec..f80370f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -32,6 +32,7 @@ import org.apache.solr.client.solrj.io.stream.expr.Explanation;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.slf4j.Logger;
@@ -48,7 +49,7 @@ public class SolrStream extends TupleStream {
   private static final long serialVersionUID = 1;
 
   private String baseUrl;
-  private Map params;
+  private SolrParams params;
   private int numWorkers;
   private int workerID;
   private boolean trace;
@@ -59,8 +60,26 @@ public class SolrStream extends TupleStream {
   private String slice;
   private long checkpoint = -1;
 
+  /**
+   * @param baseUrl Base URL of the stream.
+   * @param params  Map&lt;String, String&gt; of parameters
+   * @deprecated, use the form that thakes SolrParams. Existing code can use
+   * new ModifiableSolrParams(SolrParams.toMultiMap(new NamedList(params)))
+   * for existing calls that use Map&lt;String, String&gt;
+   */
+  @Deprecated
   public SolrStream(String baseUrl, Map params) {
     this.baseUrl = baseUrl;
+    this.params = new ModifiableSolrParams(new MapSolrParams(params));
+  }
+
+  /**
+   * @param baseUrl Base URL of the stream.
+   * @param params  Map&lt;String, String&gt; of parameters
+   */
+
+  public SolrStream(String baseUrl, SolrParams params) {
+    this.baseUrl = baseUrl;
     this.params = params;
   }
 
@@ -118,9 +137,9 @@ public class SolrStream extends TupleStream {
     this.checkpoint = checkpoint;
   }
 
-  private SolrParams loadParams(Map params) throws IOException {
-    ModifiableSolrParams solrParams = new ModifiableSolrParams();
-    if(params.containsKey("partitionKeys")) {
+  private SolrParams loadParams(SolrParams paramsIn) throws IOException {
+    ModifiableSolrParams solrParams = new ModifiableSolrParams(paramsIn);
+    if (params.get("partitionKeys") != null) {
       if(!params.get("partitionKeys").equals("none")) {
         String partitionFilter = getPartitionFilter();
         solrParams.add("fq", partitionFilter);
@@ -135,12 +154,6 @@ public class SolrStream extends TupleStream {
       solrParams.add("fq", "{!frange cost=100 incl=false l="+checkpoint+"}_version_");
     }
 
-    Iterator<Map.Entry> it = params.entrySet().iterator();
-    while(it.hasNext()) {
-      Map.Entry entry = it.next();
-      solrParams.add((String)entry.getKey(), entry.getValue().toString());
-    }
-
     return solrParams;
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b4defc/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
index c128a02..c0f4b43 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
@@ -42,7 +42,9 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.client.solrj.io.stream.metrics.Metric;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 
 public class StatsStream extends TupleStream implements Expressible  {
@@ -52,7 +54,7 @@ public class StatsStream extends TupleStream implements Expressible  {
   private Metric[] metrics;
   private String zkHost;
   private Tuple tuple;
-  private Map<String, String> props;
+  private SolrParams params;
   private String collection;
   private boolean done;
   private long count;
@@ -60,20 +62,29 @@ public class StatsStream extends TupleStream implements Expressible  {
   protected transient SolrClientCache cache;
   protected transient CloudSolrClient cloudSolrClient;
 
+  // Use StatsStream(String, String, SolrParams, Metric[]
+  @Deprecated
   public StatsStream(String zkHost,
                      String collection,
                      Map<String, String> props,
                      Metric[] metrics) {
-    init(zkHost, collection, props, metrics);
+    init(zkHost, collection, new MapSolrParams(props), metrics);
   }
-  
-  private void init(String zkHost, String collection, Map<String, String> props, Metric[] metrics) {
+
+  public StatsStream(String zkHost,
+                     String collection,
+                     SolrParams params,
+                     Metric[] metrics) {
+    init(zkHost, collection, params, metrics);
+  }
+
+  private void init(String zkHost, String collection, SolrParams params, Metric[] metrics) {
     this.zkHost  = zkHost;
-    this.props   = props;
+    this.params = params;
     this.metrics = metrics;
     this.collection = collection;
   }
-  
+
   public StatsStream(StreamExpression expression, StreamFactory factory) throws IOException{   
     // grab all parameters out
     String collectionName = factory.getValueOperand(expression, 0);
@@ -95,11 +106,11 @@ public class StatsStream extends TupleStream implements Expressible  {
     if(0 == namedParams.size()){
       throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
     }
-    
-    Map<String,String> params = new HashMap<String,String>();
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
     for(StreamExpressionNamedParameter namedParam : namedParams){
       if(!namedParam.getName().equals("zkHost")){
-        params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
+        params.set(namedParam.getName(), namedParam.getParameter().toString().trim());
       }
     }
     
@@ -139,8 +150,9 @@ public class StatsStream extends TupleStream implements Expressible  {
     expression.addParameter(collection);
     
     // parameters
-    for(Entry<String,String> param : props.entrySet()){
-      expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue()));
+    ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+    for (Entry<String, String[]> param : mParams.getMap().entrySet()) {
+      expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), String.join(",", param.getValue())));
     }
     
     // zkHost
@@ -170,8 +182,9 @@ public class StatsStream extends TupleStream implements Expressible  {
       // parallel stream.
     
     child.setImplementingClass("Solr/Lucene");
-    child.setExpressionType(ExpressionType.DATASTORE);    
-    child.setExpression(props.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
+    child.setExpressionType(ExpressionType.DATASTORE);
+    ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+    child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
     explanation.addChild(child);
     
     return explanation;
@@ -195,12 +208,12 @@ public class StatsStream extends TupleStream implements Expressible  {
           .build();
     }
 
-    ModifiableSolrParams params = getParams(this.props);
-    addStats(params, metrics);
-    params.add("stats", "true");
-    params.add("rows", "0");
+    ModifiableSolrParams paramsLoc = new ModifiableSolrParams(this.params);
+    addStats(paramsLoc, metrics);
+    paramsLoc.set("stats", "true");
+    paramsLoc.set("rows", "0");
 
-    QueryRequest request = new QueryRequest(params);
+    QueryRequest request = new QueryRequest(paramsLoc);
     try {
       NamedList response = cloudSolrClient.request(request, collection);
       this.tuple = getTuple(response);
@@ -275,15 +288,6 @@ public class StatsStream extends TupleStream implements Expressible  {
     }
   }
 
-  private ModifiableSolrParams getParams(Map<String, String> props) {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    for(String key : props.keySet()) {
-      String value = props.get(key);
-      params.add(key, value);
-    }
-    return params;
-  }
-
   private Tuple getTuple(NamedList response) {
 
     Map map = new HashMap();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b4defc/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index ff44109..8d3279a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -56,6 +56,9 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.MapSolrParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.slf4j.Logger;
@@ -75,6 +78,8 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
   private Map<String, Long> checkpoints = new HashMap<String, Long>();
   private String checkpointCollection;
 
+  // Use TopicStream that takes a SolrParams
+  @Deprecated
   public TopicStream(String zkHost,
                      String checkpointCollection,
                      String collection,
@@ -86,25 +91,42 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
          collection,
          id,
          checkpointEvery,
-         params);
+         new MapSolrParams(params));
   }
 
+  public TopicStream(String zkHost,
+                     String checkpointCollection,
+                     String collection,
+                     String id,
+                     long checkpointEvery,
+                     SolrParams params) {
+    init(zkHost,
+        checkpointCollection,
+        collection,
+        id,
+        checkpointEvery,
+        params);
+  }
+
+
   private void init(String zkHost,
                     String checkpointCollection,
                     String collection,
                     String id,
                     long checkpointEvery,
-                    Map<String, String> params) {
+                    SolrParams params) {
     this.zkHost  = zkHost;
-    this.params  = params;
+    ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+    
+    if(mParams.getParams("rows") == null) {
+      mParams.set("rows", "500");
+    }
+    this.params  = mParams; 
     this.collection = collection;
     this.checkpointCollection = checkpointCollection;
     this.checkpointEvery = checkpointEvery;
     this.id = id;
     this.comp = new FieldComparator("_version_", ComparatorOrder.ASCENDING);
-    if(!params.containsKey("rows")) {
-      params.put("rows", "500");
-    }
   }
 
   public TopicStream(StreamExpression expression, StreamFactory factory) throws IOException{
@@ -147,12 +169,12 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
       throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
     }
 
-    Map<String,String> params = new HashMap<String,String>();
+    ModifiableSolrParams params = new ModifiableSolrParams();
     for(StreamExpressionNamedParameter namedParam : namedParams){
       if(!namedParam.getName().equals("zkHost") &&
           !namedParam.getName().equals("id") &&
           !namedParam.getName().equals("checkpointEvery")) {
-        params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
+        params.set(namedParam.getName(), namedParam.getParameter().toString().trim());
       }
     }
 
@@ -189,8 +211,9 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
     // collection
     expression.addParameter(collection);
 
-    for(Entry<String,String> param : params.entrySet()) {
-      String value = param.getValue();
+    ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+    for(Entry<String, String[]> param : mParams.getMap().entrySet()) {
+      String value = String.join(",", param.getValue());
 
       // SOLR-8409: This is a special case where the params contain a " character
       // Do note that in any other BASE streams with parameters where a " might come into play
@@ -226,8 +249,10 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
         // parallel stream.
       
       child.setImplementingClass("Solr/Lucene");
-      child.setExpressionType(ExpressionType.DATASTORE);    
-      child.setExpression(params.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
+      child.setExpressionType(ExpressionType.DATASTORE);
+      
+      ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+      child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
       explanation.addChild(child);
     }
     
@@ -254,8 +279,8 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
     this.solrStreams = new ArrayList();
     this.eofTuples = Collections.synchronizedMap(new HashMap());
 
-    if(cache != null) {
-      cloudSolrClient = cache.getCloudSolrClient(zkHost);
+    if(streamContext.getSolrClientCache() != null) {
+      cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
     } else {
       cloudSolrClient = new Builder()
           .withZkHost(zkHost)
@@ -313,7 +338,7 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
         }
       }
 
-      if (cache == null) {
+      if (streamContext.getSolrClientCache() == null) {
         cloudSolrClient.close();
       }
     }
@@ -369,11 +394,11 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
   private long getCheckpoint(Slice slice, Set<String> liveNodes) throws IOException {
     Collection<Replica> replicas = slice.getReplicas();
     long checkpoint = -1;
-    Map params = new HashMap();
-    params.put("q","*:*");
-    params.put("sort", "_version_ desc");
-    params.put("distrib", "false");
-    params.put("rows", 1);
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set("q","*:*");
+    params.set("sort", "_version_ desc");
+    params.set("distrib", "false");
+    params.set("rows", 1);
     for(Replica replica : replicas) {
       if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
         String coreUrl = replica.getCoreUrl();
@@ -432,7 +457,7 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
         if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())){
 
 
-          HttpSolrClient httpClient = cache.getHttpSolrClient(replica.getCoreUrl());
+          HttpSolrClient httpClient = streamContext.getSolrClientCache().getHttpSolrClient(replica.getCoreUrl());
           try {
 
             SolrDocument doc = httpClient.getById(id);
@@ -477,20 +502,19 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
           throw new Exception("Collection not found:" + this.collection);
         }
       }
-
-      params.put("distrib", "false"); // We are the aggregator.
-      String fl = params.get("fl");
-      params.put("sort", "_version_ asc");
+      ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+      mParams.set("distrib", "false"); // We are the aggregator.
+      String fl = mParams.get("fl");
+      mParams.set("sort", "_version_ asc");
       if(!fl.contains("_version_")) {
         fl += ",_version_";
       }
-      params.put("fl", fl);
+      mParams.set("fl", fl);
 
       Random random = new Random();
 
       for(Slice slice : slices) {
-        Map localParams = new HashMap();
-        localParams.putAll(params);
+        ModifiableSolrParams localParams = new ModifiableSolrParams(mParams);
         long checkpoint = checkpoints.get(slice.getName());
 
         Collection<Replica> replicas = slice.getReplicas();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b4defc/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
index c429fe8..79579d1 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java
@@ -397,7 +397,6 @@ public class GraphExpressionTest extends SolrCloudTestCase {
         .commit(cluster.getSolrClient(), COLLECTION);
 
     List<Tuple> tuples = null;
-    Set<String> paths = null;
     GatherNodesStream stream = null;
     StreamContext context = new StreamContext();
     SolrClientCache cache = new SolrClientCache();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b4defc/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java
index 27c9dca..b9b6ed5 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java
@@ -29,11 +29,13 @@ import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.StreamingTest;
 import org.apache.solr.client.solrj.io.stream.TupleStream;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.AbstractDistribZkTestBase;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.SolrParams;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -100,8 +102,7 @@ public class GraphTest extends SolrCloudTestCase {
     SolrClientCache cache = new SolrClientCache();
     context.setSolrClientCache(cache);
 
-    Map params = new HashMap();
-    params.put("fq", "predicate_s:knows");
+    SolrParams sParams = StreamingTest.mapParams("fq", "predicate_s:knows");
 
     stream = new ShortestPathStream(zkHost,
                                                        "collection1",
@@ -109,7 +110,7 @@ public class GraphTest extends SolrCloudTestCase {
                                                        "steve",
                                                         "from_s",
                                                         "to_s",
-                                                        params,
+                                                        sParams,
                                                         20,
                                                         3,
                                                         6);
@@ -131,7 +132,7 @@ public class GraphTest extends SolrCloudTestCase {
 
     //Test with batch size of 1
 
-    params.put("fq", "predicate_s:knows");
+    sParams = StreamingTest.mapParams("fq", "predicate_s:knows");
 
     stream = new ShortestPathStream(zkHost,
         "collection1",
@@ -139,7 +140,7 @@ public class GraphTest extends SolrCloudTestCase {
         "steve",
         "from_s",
         "to_s",
-        params,
+        sParams,
         1,
         3,
         6);
@@ -159,7 +160,7 @@ public class GraphTest extends SolrCloudTestCase {
 
     //Test with bad predicate
 
-    params.put("fq", "predicate_s:crap");
+    sParams = StreamingTest.mapParams("fq", "predicate_s:crap");
 
     stream = new ShortestPathStream(zkHost,
         "collection1",
@@ -167,7 +168,7 @@ public class GraphTest extends SolrCloudTestCase {
         "steve",
         "from_s",
         "to_s",
-        params,
+        sParams,
         1,
         3,
         6);
@@ -180,7 +181,7 @@ public class GraphTest extends SolrCloudTestCase {
 
     //Test with depth 2
 
-    params.put("fq", "predicate_s:knows");
+    sParams = StreamingTest.mapParams("fq", "predicate_s:knows");
 
     stream = new ShortestPathStream(zkHost,
         "collection1",
@@ -188,7 +189,7 @@ public class GraphTest extends SolrCloudTestCase {
         "steve",
         "from_s",
         "to_s",
-        params,
+        sParams,
         1,
         3,
         2);
@@ -202,7 +203,7 @@ public class GraphTest extends SolrCloudTestCase {
 
 
     //Take out alex
-    params.put("fq", "predicate_s:knows NOT to_s:alex");
+    sParams = StreamingTest.mapParams("fq", "predicate_s:knows NOT to_s:alex");
 
     stream = new ShortestPathStream(zkHost,
         "collection1",
@@ -210,7 +211,7 @@ public class GraphTest extends SolrCloudTestCase {
         "steve",
         "from_s",
         "to_s",
-        params,
+        sParams,
         10,
         3,
         6);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b4defc/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 1f1a5bf..c853e39 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -19,7 +19,6 @@ package org.apache.solr.client.solrj.io.stream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -47,6 +46,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.AbstractDistribZkTestBase;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -175,6 +175,26 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     assert(tuples.size() == 3);
     assertOrder(tuples, 0, 3, 4);
     assertLong(tuples.get(1), "a_i", 3);
+
+
+    // Test a couple of multile field lists.
+    expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:hello0\", fq=\"a_s:hello1\", q=\"id:(*)\", " +
+        "zkHost=" + cluster.getZkServer().getZkAddress()+ ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+    stream = new CloudSolrStream(expression, factory);
+    tuples = getTuples(stream);
+
+    assertEquals("fq clauses should have prevented any docs from coming back", tuples.size(), 0);
+
+
+    expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:(hello0 OR hello1)\", q=\"id:(*)\", " +
+        "zkHost=" + cluster.getZkServer().getZkAddress() + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
+    stream = new CloudSolrStream(expression, factory);
+    tuples = getTuples(stream);
+
+    assertEquals("Combining an f1 clause should show us 2 docs", tuples.size(), 2);
+    
+        
+
   }
 
   @Test
@@ -193,33 +213,33 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     TupleStream stream;
 
     // Basic test
-    Map<String,String> params = new HashMap<>();
-    params.put("expr","merge("
+    ModifiableSolrParams sParams = new ModifiableSolrParams();
+    sParams.set("expr", "merge("
         + "${q1},"
         + "${q2},"
         + "on=${mySort})");
-    params.put(CommonParams.QT, "/stream");
-    params.put("q1", "search(" + COLLECTION + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
-    params.put("q2", "search(" + COLLECTION + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
-    params.put("mySort", "a_f asc");
-    stream = new SolrStream(url, params);
+    sParams.set(CommonParams.QT, "/stream");
+    sParams.set("q1", "search(" + COLLECTION + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
+    sParams.set("q2", "search(" + COLLECTION + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
+    sParams.set("mySort", "a_f asc");
+    stream = new SolrStream(url, sParams);
     tuples = getTuples(stream);
 
     assertEquals(4, tuples.size());
     assertOrder(tuples, 0,1,3,4);
 
     // Basic test desc
-    params.put("mySort", "a_f desc");
-    stream = new SolrStream(url, params);
+    sParams.set("mySort", "a_f desc");
+    stream = new SolrStream(url, sParams);
     tuples = getTuples(stream);
 
     assertEquals(4, tuples.size());
     assertOrder(tuples, 4,3,1,0);
 
     // Basic w/ multi comp
-    params.put("q2", "search(" + COLLECTION + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
-    params.put("mySort", "\"a_f asc, a_s asc\"");
-    stream = new SolrStream(url, params);
+    sParams.set("q2", "search(" + COLLECTION + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
+    sParams.set("mySort", "\"a_f asc, a_s asc\"");
+    stream = new SolrStream(url, sParams);
     tuples = getTuples(stream);
 
     assertEquals(5, tuples.size());
@@ -2677,16 +2697,14 @@ public class StreamExpressionTest extends SolrCloudTestCase {
 
     //Lets sleep long enough for daemon updates to run.
     //Lets stop the daemons
-    Map params = new HashMap();
-    params.put(CommonParams.QT,"/stream");
-    params.put("action","list");
+    ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream", "action", "list"));
 
     int workersComplete = 0;
     for(JettySolrRunner jetty : cluster.getJettySolrRunners()) {
       int iterations = 0;
       INNER:
       while(iterations == 0) {
-        SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", params);
+        SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams);
         solrStream.open();
         Tuple tupleResponse = solrStream.read();
         if (tupleResponse.EOF) {
@@ -2714,27 +2732,27 @@ public class StreamExpressionTest extends SolrCloudTestCase {
     cluster.getSolrClient().commit("parallelDestinationCollection1");
 
     //Lets stop the daemons
-    params = new HashMap();
-    params.put(CommonParams.QT,"/stream");
-    params.put("action", "stop");
-    params.put("id", "test");
+    sParams = new ModifiableSolrParams();
+    sParams.set(CommonParams.QT, "/stream");
+    sParams.set("action", "stop");
+    sParams.set("id", "test");
     for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
-      SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", params);
+      SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", sParams);
       solrStream.open();
       Tuple tupleResponse = solrStream.read();
       solrStream.close();
     }
 
-    params = new HashMap();
-    params.put(CommonParams.QT,"/stream");
-    params.put("action","list");
+    sParams = new ModifiableSolrParams();
+    sParams.set(CommonParams.QT, "/stream");
+    sParams.set("action", "list");
 
     workersComplete = 0;
     for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
       long stopTime = 0;
       INNER:
       while(stopTime == 0) {
-        SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", params);
+        SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", sParams);
         solrStream.open();
         Tuple tupleResponse = solrStream.read();
         if (tupleResponse.EOF) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b4defc/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index 9db02eb..1789759 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -43,6 +42,8 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.AbstractDistribZkTestBase;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -107,8 +108,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
+    SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
     UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
     List<Tuple> tuples = getTuples(ustream);
     assertEquals(4, tuples.size());
@@ -119,13 +120,13 @@ public class StreamingTest extends SolrCloudTestCase {
   @Test
   public void testSpacesInParams() throws Exception {
 
-    Map params = mapParams("q", "*:*", "fl", "id , a_s , a_i , a_f", "sort", "a_f  asc , a_i  asc");
+    SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id , a_s , a_i , a_f", "sort", "a_f  asc , a_i  asc");
 
     //CloudSolrStream compares the values of the sort with the fl field.
     //The constructor will throw an exception if the sort fields do not the
     //a value in the field list.
 
-    CloudSolrStream stream = new CloudSolrStream("", "collection1", params);
+    CloudSolrStream stream = new CloudSolrStream("", "collection1", sParams);
   }
 
   @Test
@@ -144,8 +145,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map paramsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
     ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
 
     attachStreamFactory(pstream);
@@ -170,8 +171,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
     UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
     ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING));
     attachStreamFactory(pstream);
@@ -187,6 +188,31 @@ public class StreamingTest extends SolrCloudTestCase {
   }
 
   @Test
+  public void testMultipleFqClauses() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_ss", "hello0", "a_ss", "hello1", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_ss", "hello2", "a_i", "2", "a_f", "0")
+    .add(id, "3", "a_ss", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_ss", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_ss", "hello1", "a_i", "1", "a_f", "1")
+        .add(id, "5", "a_ss", "hello1", "a_i", "10", "a_f", "1")
+        .add(id, "6", "a_ss", "hello1", "a_i", "11", "a_f", "5")
+        .add(id, "7", "a_ss", "hello1", "a_i", "12", "a_f", "5")
+        .add(id, "8", "a_ss", "hello1", "a_i", "13", "a_f", "4")
+        .commit(cluster.getSolrClient(), COLLECTION);
+
+    streamFactory.withCollectionZkHost(COLLECTION, zkHost);
+
+    ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i", 
+        "sort", "a_i asc", "fq", "a_ss:hello0", "fq", "a_ss:hello1"));
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
+    List<Tuple> tuples = getTuples(stream);
+    assertEquals("Multiple fq clauses should have been honored", tuples.size(), 1);
+    assertEquals("should only have gotten back document 0", tuples.get(0).getString("id"), "0");
+  }
+
+  @Test
   public void testRankStream() throws Exception {
 
     new UpdateRequest()
@@ -198,8 +224,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .commit(cluster.getSolrClient(), COLLECTION);
 
 
-    Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
     RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     List<Tuple> tuples = getTuples(rstream);
 
@@ -224,8 +250,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
     RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     attachStreamFactory(pstream);
@@ -253,8 +279,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test with spaces in the parameter lists.
-    Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f   asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
     stream.setTrace(true);
     List<Tuple> tuples = getTuples(stream);
     assert(tuples.get(0).get("_COLLECTION_").equals(COLLECTION));
@@ -280,8 +306,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test with spaces in the parameter lists.
-    Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
     ReducerStream rstream  = new ReducerStream(stream,
                                                new FieldEqualitor("a_s"),
                                                new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
@@ -303,8 +329,8 @@ public class StreamingTest extends SolrCloudTestCase {
     assertMaps(maps2, 4, 6);
 
     //Test with spaces in the parameter lists using a comparator
-    paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
-    stream  = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
+    stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
     rstream = new ReducerStream(stream,
                                 new FieldComparator("a_s", ComparatorOrder.ASCENDING),
                                 new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
@@ -345,8 +371,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test with spaces in the parameter lists.
-    Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
     ReducerStream rstream = new ReducerStream(stream,
                                               new FieldEqualitor("a_s"),
                                               new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
@@ -373,8 +399,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
 
     ReducerStream rstream = new ReducerStream(stream,
                                               new FieldEqualitor("a_s"),
@@ -401,8 +427,8 @@ public class StreamingTest extends SolrCloudTestCase {
 
     //Test Descending with Ascending subsort
 
-    paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
-    stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
+    stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
 
     rstream = new ReducerStream(stream,
                                 new FieldEqualitor("a_s"),
@@ -447,8 +473,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test an error that comes originates from the /select handler
-    Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
     ExceptionStream estream = new ExceptionStream(stream);
     Tuple t = getTuple(estream);
     assert(t.EOF);
@@ -456,8 +482,8 @@ public class StreamingTest extends SolrCloudTestCase {
     assert(t.getException().contains("sort param field can't be found: blah"));
 
     //Test an error that comes originates from the /export handler
-    paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export");
-    stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export");
+    stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
     estream = new ExceptionStream(stream);
     t = getTuple(estream);
     assert(t.EOF);
@@ -483,8 +509,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
     ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
     ExceptionStream estream = new ExceptionStream(pstream);
     Tuple t = getTuple(estream);
@@ -495,8 +521,8 @@ public class StreamingTest extends SolrCloudTestCase {
 
 
     //Test an error that originates from the /select handler
-    paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys","a_s");
-    stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys", "a_s");
+    stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
     pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
     estream = new ExceptionStream(pstream);
     t = getTuple(estream);
@@ -506,8 +532,8 @@ public class StreamingTest extends SolrCloudTestCase {
 
 
     //Test an error that originates from the /export handler
-    paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export", "partitionKeys","a_s");
-    stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export", "partitionKeys", "a_s");
+    stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
     pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
     estream = new ExceptionStream(pstream);
     t = getTuple(estream);
@@ -533,7 +559,7 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map paramsA = mapParams("q", "*:*");
+    SolrParams sParamsA = mapParams("q", "*:*");
 
     Metric[] metrics = {new SumMetric("a_i"),
                         new SumMetric("a_f"),
@@ -545,7 +571,7 @@ public class StreamingTest extends SolrCloudTestCase {
                         new MeanMetric("a_f"),
                         new CountMetric()};
 
-    StatsStream statsStream = new StatsStream(zkHost, COLLECTION, paramsA, metrics);
+    StatsStream statsStream = new StatsStream(zkHost, COLLECTION, sParamsA, metrics);
 
     List<Tuple> tuples = getTuples(statsStream);
 
@@ -593,7 +619,7 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
+    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
 
     Bucket[] buckets =  {new Bucket("a_s")};
 
@@ -610,7 +636,7 @@ public class StreamingTest extends SolrCloudTestCase {
     FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
                                                    ComparatorOrder.ASCENDING)};
 
-    FacetStream facetStream = new FacetStream(zkHost, COLLECTION, paramsA, buckets, metrics, sorts, 100);
+    FacetStream facetStream = new FacetStream(zkHost, COLLECTION, sParamsA, buckets, metrics, sorts, 100);
 
     List<Tuple> tuples = getTuples(facetStream);
 
@@ -692,7 +718,7 @@ public class StreamingTest extends SolrCloudTestCase {
 
     sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
 
-    facetStream = new FacetStream(zkHost, COLLECTION, paramsA, buckets, metrics, sorts, 100);
+    facetStream = new FacetStream(zkHost, COLLECTION, sParamsA, buckets, metrics, sorts, 100);
 
     tuples = getTuples(facetStream);
 
@@ -775,7 +801,7 @@ public class StreamingTest extends SolrCloudTestCase {
     sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
 
 
-    facetStream = new FacetStream(zkHost, COLLECTION, paramsA, buckets, metrics, sorts, 100);
+    facetStream = new FacetStream(zkHost, COLLECTION, sParamsA, buckets, metrics, sorts, 100);
 
     tuples = getTuples(facetStream);
 
@@ -856,7 +882,7 @@ public class StreamingTest extends SolrCloudTestCase {
 
     sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
 
-    facetStream = new FacetStream(zkHost, COLLECTION, paramsA, buckets, metrics, sorts, 100);
+    facetStream = new FacetStream(zkHost, COLLECTION, sParamsA, buckets, metrics, sorts, 100);
 
     tuples = getTuples(facetStream);
 
@@ -949,7 +975,7 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map paramsA = mapParams("q","*:*","fl","a_i,a_f");
+    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_i,a_f");
 
     Bucket[] buckets =  {new Bucket("level1_s"), new Bucket("level2_s")};
 
@@ -961,7 +987,7 @@ public class StreamingTest extends SolrCloudTestCase {
     FacetStream facetStream = new FacetStream(
         zkHost,
         COLLECTION,
-        paramsA,
+        sParamsA,
         buckets,
         metrics,
         sorts,
@@ -1041,7 +1067,7 @@ public class StreamingTest extends SolrCloudTestCase {
     facetStream = new FacetStream(
         zkHost,
         COLLECTION,
-        paramsA,
+        sParamsA,
         buckets,
         metrics,
         sorts,
@@ -1134,8 +1160,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
 
     Bucket[] buckets =  {new Bucket("a_s")};
 
@@ -1234,8 +1260,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc", "qt", "/export");
-    stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export");
+    stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
 
     Bucket[] buckets1 =  {new Bucket("a_s")};
 
@@ -1285,12 +1311,9 @@ public class StreamingTest extends SolrCloudTestCase {
     SolrClientCache cache = new SolrClientCache();
     context.setSolrClientCache(cache);
 
-    Map params = new HashMap();
-    params.put("q","a_s:hello0");
-    params.put("rows", "500");
-    params.put("fl", "id");
+    SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
 
-    TopicStream topicStream = new TopicStream(zkHost, COLLECTION, COLLECTION, "50000000", 1000000, params);
+    TopicStream topicStream = new TopicStream(zkHost, COLLECTION, COLLECTION, "50000000", 1000000, sParams);
 
     DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
     daemonStream.setStreamContext(context);
@@ -1300,13 +1323,11 @@ public class StreamingTest extends SolrCloudTestCase {
     // Wait for the checkpoint
     JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
 
-    Map params1 = new HashMap();
-    params1.put("qt","/get");
-    params1.put("ids","50000000");
-    params1.put("fl","id");
+
+    SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
     int count = 0;
     while(count == 0) {
-      SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTION, params1);
+      SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTION, sParams1);
       List<Tuple> tuples = getTuples(solrStream);
       count = tuples.size();
       if(count > 0) {
@@ -1364,8 +1385,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
 
     Bucket[] buckets =  {new Bucket("a_s")};
 
@@ -1475,8 +1496,8 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map paramsA = mapParams("q", "blah", "fl", "id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
     ReducerStream rstream = new ReducerStream(stream,
                                               new FieldEqualitor("a_s"),
                                               new GroupOperation(new FieldComparator("a_s", ComparatorOrder.ASCENDING), 2));
@@ -1497,8 +1518,8 @@ public class StreamingTest extends SolrCloudTestCase {
                  "1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f,s_multi,i_multi,f_multi","sort", "a_s asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f,s_multi,i_multi,f_multi", "sort", "a_s asc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
     List<Tuple> tuples = getTuples(stream);
     Tuple tuple = tuples.get(0);
 
@@ -1538,11 +1559,11 @@ public class StreamingTest extends SolrCloudTestCase {
         .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test ascending
-    Map paramsA = mapParams("q","id:(4 1)","fl","id,a_s,a_i","sort", "a_i asc");
-    CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = mapParams("q", "id:(4 1)", "fl", "id,a_s,a_i", "sort", "a_i asc");
+    CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
 
-    Map paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i asc");
-    CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
+    SolrParams sParamsB = mapParams("q", "id:(0 2 3)", "fl", "id,a_s,a_i", "sort", "a_i asc");
+    CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
 
     MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
     List<Tuple> tuples = getTuples(mstream);
@@ -1551,11 +1572,11 @@ public class StreamingTest extends SolrCloudTestCase {
     assertOrder(tuples, 0,1,2,3,4);
 
     //Test descending
-    paramsA = mapParams("q","id:(4 1)","fl","id,a_s,a_i","sort", "a_i desc");
-    streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    sParamsA = mapParams("q", "id:(4 1)", "fl", "id,a_s,a_i", "sort", "a_i desc");
+    streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
 
-    paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i desc");
-    streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
+    sParamsB = mapParams("q", "id:(0 2 3)", "fl", "id,a_s,a_i", "sort", "a_i desc");
+    streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
 
     mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     tuples = getTuples(mstream);
@@ -1565,11 +1586,11 @@ public class StreamingTest extends SolrCloudTestCase {
 
     //Test compound sort
 
-    paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
-    streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    sParamsA = mapParams("q", "id:(2 4 1)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
+    streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
 
-    paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
-    streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
+    sParamsB = mapParams("q", "id:(0 3)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
+    streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
 
     mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.ASCENDING)));
     tuples = getTuples(mstream);
@@ -1577,11 +1598,11 @@ public class StreamingTest extends SolrCloudTestCase {
     assert(tuples.size() == 5);
     assertOrder(tuples, 0,2,1,3,4);
 
-    paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
-    streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    sParamsA = mapParams("q", "id:(2 4 1)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
+    streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
 
-    paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
-    streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
+    sParamsB = mapParams("q", "id:(0 3)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
+    streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
 
     mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
     tuples = getTuples(mstream);
@@ -1608,11 +1629,11 @@ public class StreamingTest extends SolrCloudTestCase {
         .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test ascending
-    Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
-    CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
 
-    Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
-    CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
+    SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
 
     MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
     ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
@@ -1623,11 +1644,11 @@ public class StreamingTest extends SolrCloudTestCase {
     assertOrder(tuples, 0,1,2,3,4,7,6,8,9);
 
     //Test descending
-    paramsA = mapParams("q", "id:(4 1 8 9)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
-    streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    sParamsA = mapParams("q", "id:(4 1 8 9)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
+    streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
 
-    paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
-    streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
+    sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
+    streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
 
     mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     pstream = new ParallelStream(zkHost, COLLECTION, mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
@@ -1656,11 +1677,11 @@ public class StreamingTest extends SolrCloudTestCase {
         .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test ascending
-    Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
-    CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
 
-    Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
-    CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
+    SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
+    CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
 
     MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
     ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
@@ -1685,20 +1706,19 @@ public class StreamingTest extends SolrCloudTestCase {
         .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
         .commit(cluster.getSolrClient(), COLLECTION);
 
-    Map params = null;
 
     //Basic CloudSolrStream Test with Descending Sort
 
-    params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i desc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
+    SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i desc");
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
     List<Tuple> tuples = getTuples(stream);
 
     assert(tuples.size() == 5);
     assertOrder(tuples, 4, 3, 2, 1, 0);
 
     //With Ascending Sort
-    params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
-    stream = new CloudSolrStream(zkHost, COLLECTION, params);
+    sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
+    stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
     tuples = getTuples(stream);
 
     assert(tuples.size() == 5);
@@ -1706,16 +1726,16 @@ public class StreamingTest extends SolrCloudTestCase {
 
 
     //Test compound sort
-    params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
-    stream = new CloudSolrStream(zkHost, COLLECTION, params);
+    sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
+    stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
     tuples = getTuples(stream);
 
     assert(tuples.size() == 5);
     assertOrder(tuples, 2,0,1,3,4);
 
 
-    params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
-    stream = new CloudSolrStream(zkHost, COLLECTION, params);
+    sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
+    stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
     tuples = getTuples(stream);
 
     assert (tuples.size() == 5);
@@ -1723,21 +1743,6 @@ public class StreamingTest extends SolrCloudTestCase {
 
   }
 
-  protected Map mapParams(String... vals) {
-    Map params = new HashMap();
-    String k = null;
-    for(String val : vals) {
-      if(k == null) {
-        k = val;
-      } else {
-        params.put(k, val);
-        k = null;
-      }
-    }
-
-    return params;
-  }
-
   protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
     tupleStream.open();
     List<Tuple> tuples = new ArrayList();
@@ -1819,4 +1824,15 @@ public class StreamingTest extends SolrCloudTestCase {
     streamContext.setStreamFactory(streamFactory);
     tupleStream.setStreamContext(streamContext);
   }
+
+  public static SolrParams mapParams(String... vals) {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    assertEquals("Parameters passed in here must be in pairs!", 0, (vals.length % 2));
+    for (int idx = 0; idx < vals.length; idx += 2) {
+      params.add(vals[idx], vals[idx + 1]);
+    }
+
+    return params;
+  }
+
 }


Mime
View raw message