lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From romseyg...@apache.org
Subject [1/5] lucene-solr:master: SOLR-9065: Migrate SolrJ tests to SolrCloudTestCase
Date Wed, 04 May 2016 19:18:52 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/master e15bab37a -> 630a8c950


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/630a8c95/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index 61253e1..9db02eb 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -16,22 +16,19 @@
  */
 package org.apache.solr.client.solrj.io.stream;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.LuceneTestCase.Slow;
-import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
-import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
 import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
 import org.apache.solr.client.solrj.io.ops.GroupOperation;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
@@ -42,13 +39,13 @@ import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.Metric;
 import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
-import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
-import org.apache.solr.cloud.AbstractZkTestCase;
-import org.apache.solr.common.SolrInputDocument;
-import org.junit.After;
-import org.junit.AfterClass;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.AbstractDistribZkTestBase;
+import org.apache.solr.cloud.SolrCloudTestCase;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -57,106 +54,70 @@ import org.junit.Test;
  *
  **/
 
-@Slow
 @LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
-public class StreamingTest extends AbstractFullDistribZkTestBase {
-
-  private static final String SOLR_HOME = getFile("solrj" + File.separator + "solr").getAbsolutePath();
-  private StreamFactory streamFactory;
+public class StreamingTest extends SolrCloudTestCase {
 
-  static {
-    schemaString = "schema-streaming.xml";
-  }
+  public static final int TIMEOUT = 30;
 
-  @BeforeClass
-  public static void beforeSuperClass() {
-    AbstractZkTestCase.SOLRHOME = new File(SOLR_HOME());
-  }
+  public static final String COLLECTION = "streams";
 
-  @AfterClass
-  public static void afterSuperClass() {
+  private static final StreamFactory streamFactory = new StreamFactory()
+      .withFunctionName("search", CloudSolrStream.class)
+      .withFunctionName("merge", MergeStream.class)
+      .withFunctionName("unique", UniqueStream.class)
+      .withFunctionName("top", RankStream.class)
+      .withFunctionName("reduce", ReducerStream.class)
+      .withFunctionName("group", GroupOperation.class)
+      .withFunctionName("rollup", RollupStream.class)
+      .withFunctionName("parallel", ParallelStream.class);
 
-  }
+  private static String zkHost;
 
-  protected String getCloudSolrConfig() {
-    return "solrconfig-streaming.xml";
-  }
+  @BeforeClass
+  public static void configureCluster() throws Exception {
+    configureCluster(2)
+        .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
+        .configure();
 
+    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), false, true, TIMEOUT);
 
-  @Override
-  public String getSolrHome() {
-    return SOLR_HOME;
+    zkHost = cluster.getZkServer().getZkAddress();
+    streamFactory.withCollectionZkHost(COLLECTION, zkHost);
   }
 
-  public static String SOLR_HOME() {
-    return SOLR_HOME;
-  }
+  private static final String id = "id";
 
   @Before
-  @Override
-  public void setUp() throws Exception {
-    super.setUp();
-    // we expect this time of exception as shards go up and down...
-    //ignoreException(".*");
-    //System.setProperty("export.test", "true");
-    System.setProperty("numShards", Integer.toString(sliceCount));
+  public void clearCollection() throws Exception {
+    new UpdateRequest()
+        .deleteByQuery("*:*")
+        .commit(cluster.getSolrClient(), COLLECTION);
   }
 
-  @Override
-  @After
-  public void tearDown() throws Exception {
-    super.tearDown();
-    resetExceptionIgnores();
-  }
-
-  public StreamingTest() {
-    super();
-    sliceCount = 2;
-
-    streamFactory = new StreamFactory()
-                    .withFunctionName("search", CloudSolrStream.class)
-                    .withFunctionName("merge", MergeStream.class)
-                    .withFunctionName("unique", UniqueStream.class)
-                    .withFunctionName("top", RankStream.class)
-                    .withFunctionName("reduce", ReducerStream.class)
-                    .withFunctionName("group", GroupOperation.class)
-                    .withFunctionName("rollup", RollupStream.class)
-                    .withFunctionName("parallel", ParallelStream.class);
-  }
-
-  private void testUniqueStream() throws Exception {
+  @Test
+  public void testUniqueStream() throws Exception {
 
     //Test CloudSolrStream and UniqueStream
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-
-    commit();
-
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
     UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
     List<Tuple> tuples = getTuples(ustream);
-    assert(tuples.size() == 4);
+    assertEquals(4, tuples.size());
     assertOrder(tuples, 0,1,3,4);
 
-    del("*:*");
-    commit();
-
   }
 
-
-  private void testSpacesInParams() throws Exception {
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+  @Test
+  public void testSpacesInParams() throws Exception {
 
     Map params = mapParams("q", "*:*", "fl", "id , a_s , a_i , a_f", "sort", "a_f  asc , a_i  asc");
 
@@ -164,66 +125,55 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     //The constructor will throw an exception if the sort fields do not the
     //a value in the field list.
 
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
-
-    del("*:*");
-    commit();
-
+    CloudSolrStream stream = new CloudSolrStream("", "collection1", params);
   }
 
-  private void testNonePartitionKeys() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+  @Test
+  public void testNonePartitionKeys() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map paramsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
 
     attachStreamFactory(pstream);
     List<Tuple> tuples = getTuples(pstream);
 
     assert(tuples.size() == 20); // Each tuple will be double counted.
 
-    del("*:*");
-    commit();
-
   }
 
-  private void testParallelUniqueStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1");
-    indexr(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5");
-    indexr(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5");
-    indexr(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+  @Test
+  public void testParallelUniqueStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .add(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1")
+        .add(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5")
+        .add(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5")
+        .add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
     UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING));
     attachStreamFactory(pstream);
     List<Tuple> tuples = getTuples(pstream);
     assert(tuples.size() == 5);
@@ -234,129 +184,104 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     Map<String,Tuple> eofTuples = pstream.getEofTuples();
     assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker.
 
-    del("*:*");
-    commit();
-
   }
 
+  @Test
+  public void testRankStream() throws Exception {
 
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
-  private void testRankStream() throws Exception {
-
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
 
     Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
     RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     List<Tuple> tuples = getTuples(rstream);
 
-
     assert(tuples.size() == 3);
     assertOrder(tuples, 4,3,2);
 
-    del("*:*");
-    commit();
   }
 
-  private void testParallelRankStream() throws Exception {
-
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "5", "a_s", "hello1", "a_i", "5", "a_f", "1");
-    indexr(id, "6", "a_s", "hello1", "a_i", "6", "a_f", "1");
-    indexr(id, "7", "a_s", "hello1", "a_i", "7", "a_f", "1");
-    indexr(id, "8", "a_s", "hello1", "a_i", "8", "a_f", "1");
-    indexr(id, "9", "a_s", "hello1", "a_i", "9", "a_f", "1");
-    indexr(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+  @Test
+  public void testParallelRankStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "5", "a_s", "hello1", "a_i", "5", "a_f", "1")
+        .add(id, "6", "a_s", "hello1", "a_i", "6", "a_f", "1")
+        .add(id, "7", "a_s", "hello1", "a_i", "7", "a_f", "1")
+        .add(id, "8", "a_s", "hello1", "a_i", "8", "a_f", "1")
+        .add(id, "9", "a_s", "hello1", "a_i", "9", "a_f", "1")
+        .add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
     RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     attachStreamFactory(pstream);
     List<Tuple> tuples = getTuples(pstream);
 
     assert(tuples.size() == 10);
     assertOrder(tuples, 10,9,8,7,6,5,4,3,2,0);
 
-    del("*:*");
-    commit();
   }
 
-  private void testTrace() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+  @Test
+  public void testTrace() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test with spaces in the parameter lists.
     Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
     stream.setTrace(true);
     List<Tuple> tuples = getTuples(stream);
-    assert(tuples.get(0).get("_COLLECTION_").equals("collection1"));
-    assert(tuples.get(1).get("_COLLECTION_").equals("collection1"));
-    assert(tuples.get(2).get("_COLLECTION_").equals("collection1"));
-    assert(tuples.get(3).get("_COLLECTION_").equals("collection1"));
-
-    del("*:*");
-    commit();
+    assert(tuples.get(0).get("_COLLECTION_").equals(COLLECTION));
+    assert(tuples.get(1).get("_COLLECTION_").equals(COLLECTION));
+    assert(tuples.get(2).get("_COLLECTION_").equals(COLLECTION));
+    assert(tuples.get(3).get("_COLLECTION_").equals(COLLECTION));
   }
 
-
-
-
-  private void testReducerStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+  @Test
+  public void testReducerStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test with spaces in the parameter lists.
     Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
     ReducerStream rstream  = new ReducerStream(stream,
                                                new FieldEqualitor("a_s"),
                                                new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
@@ -379,7 +304,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     //Test with spaces in the parameter lists using a comparator
     paramsA = mapParams("q","*:*","fl","id,a_s, a_i,  a_f","sort", "a_s asc  ,  a_f   asc");
-    stream  = new CloudSolrStream(zkHost, "collection1", paramsA);
+    stream  = new CloudSolrStream(zkHost, COLLECTION, paramsA);
     rstream = new ReducerStream(stream,
                                 new FieldComparator("a_s", ComparatorOrder.ASCENDING),
                                 new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
@@ -400,32 +325,28 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     maps2 = t2.getMaps("group");
     assertMaps(maps2, 6, 4);
 
-    del("*:*");
-    commit();
   }
 
-  private void testZeroReducerStream() throws Exception {
+  @Test
+  public void testZeroReducerStream() throws Exception {
 
     //Gracefully handle zero results
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test with spaces in the parameter lists.
     Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
     ReducerStream rstream = new ReducerStream(stream,
                                               new FieldEqualitor("a_s"),
                                               new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
@@ -434,37 +355,32 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     assert(tuples.size() == 0);
 
-    del("*:*");
-    commit();
   }
 
-
-  private void testParallelReducerStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+  @Test
+  public void testParallelReducerStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
 
     ReducerStream rstream = new ReducerStream(stream,
                                               new FieldEqualitor("a_s"),
                                               new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
 
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
 
     attachStreamFactory(pstream);
     List<Tuple> tuples = getTuples(pstream);
@@ -486,13 +402,13 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     //Test Descending with Ascending subsort
 
     paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
-    stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
 
     rstream = new ReducerStream(stream,
                                 new FieldEqualitor("a_s"),
                                 new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3));
 
-    pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
+    pstream = new ParallelStream(zkHost, COLLECTION, rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
 
     attachStreamFactory(pstream);
     tuples = getTuples(pstream);
@@ -511,34 +427,28 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     maps2 = t2.getMaps("group");
     assertMaps(maps2, 0, 2, 1);
 
-
-
-    del("*:*");
-    commit();
   }
 
-
-  private void testExceptionStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-
+  @Test
+  @Ignore
+  public void testExceptionStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test an error that comes originates from the /select handler
     Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
     ExceptionStream estream = new ExceptionStream(stream);
     Tuple t = getTuple(estream);
     assert(t.EOF);
@@ -547,7 +457,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     //Test an error that comes originates from the /export handler
     paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export");
-    stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
     estream = new ExceptionStream(stream);
     t = getTuple(estream);
     assert(t.EOF);
@@ -556,26 +466,26 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assert(t.getException().contains("undefined field:"));
   }
 
-  private void testParallelExceptionStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
+  @Test
+  @Ignore
+  public void testParallelExceptionStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    ParallelStream pstream = new ParallelStream(zkHost,"collection1", stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
     ExceptionStream estream = new ExceptionStream(pstream);
     Tuple t = getTuple(estream);
     assert(t.EOF);
@@ -586,8 +496,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     //Test an error that originates from the /select handler
     paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys","a_s");
-    stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    pstream = new ParallelStream(zkHost,"collection1", stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
+    stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
     estream = new ExceptionStream(pstream);
     t = getTuple(estream);
     assert(t.EOF);
@@ -597,8 +507,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     //Test an error that originates from the /export handler
     paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export", "partitionKeys","a_s");
-    stream = new CloudSolrStream(zkHost, "collection1", paramsA);
-    pstream = new ParallelStream(zkHost,"collection1", stream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+    stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
+    pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
     estream = new ExceptionStream(pstream);
     t = getTuple(estream);
     assert(t.EOF);
@@ -607,23 +517,21 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assert(t.getException().contains("undefined field:"));
   }
 
-
-  private void testStatsStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
+  @Test
+  public void testStatsStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map paramsA = mapParams("q", "*:*");
 
@@ -637,10 +545,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
                         new MeanMetric("a_f"),
                         new CountMetric()};
 
-    StatsStream statsStream = new StatsStream(zkHost,
-                                              "collection1",
-                                              paramsA,
-                                              metrics);
+    StatsStream statsStream = new StatsStream(zkHost, COLLECTION, paramsA, metrics);
 
     List<Tuple> tuples = getTuples(statsStream);
 
@@ -670,26 +575,23 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assertTrue(avgf.doubleValue() == 5.5D);
     assertTrue(count.doubleValue() == 10);
 
-    del("*:*");
-    commit();
   }
 
-  private void testFacetStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
+  @Test
+  public void testFacetStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
 
@@ -708,13 +610,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
                                                    ComparatorOrder.ASCENDING)};
 
-    FacetStream facetStream = new FacetStream(zkHost,
-                                              "collection1",
-                                              paramsA,
-                                              buckets,
-                                              metrics,
-                                              sorts,
-                                              100);
+    FacetStream facetStream = new FacetStream(zkHost, COLLECTION, paramsA, buckets, metrics, sorts, 100);
 
     List<Tuple> tuples = getTuples(facetStream);
 
@@ -796,13 +692,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
 
-    facetStream = new FacetStream(zkHost,
-                                  "collection1",
-                                  paramsA,
-                                  buckets,
-                                  metrics,
-                                  sorts,
-                                  100);
+    facetStream = new FacetStream(zkHost, COLLECTION, paramsA, buckets, metrics, sorts, 100);
 
     tuples = getTuples(facetStream);
 
@@ -885,13 +775,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
 
 
-    facetStream = new FacetStream(zkHost,
-                                  "collection1",
-                                  paramsA,
-                                  buckets,
-                                  metrics,
-                                  sorts,
-                                  100);
+    facetStream = new FacetStream(zkHost, COLLECTION, paramsA, buckets, metrics, sorts, 100);
 
     tuples = getTuples(facetStream);
 
@@ -922,7 +806,6 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assertTrue(avgf.doubleValue() == 5.5D);
     assertTrue(count.doubleValue() == 2);
 
-
     tuple = tuples.get(1);
     bucket = tuple.getString("a_s");
     sumi = tuple.getDouble("sum(a_i)");
@@ -946,7 +829,6 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assertTrue(avgf.doubleValue() == 6.5D);
     assertTrue(count.doubleValue() == 4);
 
-
     tuple = tuples.get(2);
     bucket = tuple.getString("a_s");
     sumi = tuple.getDouble("sum(a_i)");
@@ -974,19 +856,12 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
 
-    facetStream = new FacetStream(zkHost,
-                                  "collection1",
-                                  paramsA,
-                                  buckets,
-                                  metrics,
-                                  sorts,
-                                  100);
+    facetStream = new FacetStream(zkHost, COLLECTION, paramsA, buckets, metrics, sorts, 100);
 
     tuples = getTuples(facetStream);
 
     assert(tuples.size() == 3);
 
-
     tuple = tuples.get(0);
     bucket = tuple.getString("a_s");
     sumi = tuple.getDouble("sum(a_i)");
@@ -1010,7 +885,6 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assertTrue(avgf.doubleValue() == 4.5D);
     assertTrue(count.doubleValue() == 4);
 
-
     tuple = tuples.get(1);
     bucket = tuple.getString("a_s");
     sumi = tuple.getDouble("sum(a_i)");
@@ -1034,7 +908,6 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assertTrue(avgf.doubleValue() == 6.5D);
     assertTrue(count.doubleValue() == 4);
 
-
     tuple = tuples.get(2);
     bucket = tuple.getString("a_s");
     sumi = tuple.getDouble("sum(a_i)");
@@ -1058,27 +931,23 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assertTrue(avgf.doubleValue() == 5.5D);
     assertTrue(count.doubleValue() == 2);
 
-    del("*:*");
-    commit();
   }
 
-
-  private void testSubFacetStream() throws Exception {
-
-    indexr(id, "0", "level1_s", "hello0", "level2_s", "a", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "level1_s", "hello0", "level2_s", "a", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "level1_s", "hello3", "level2_s", "a", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "level1_s", "hello4", "level2_s", "a", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "level1_s", "hello0", "level2_s", "b", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "level1_s", "hello3", "level2_s", "b", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "level1_s", "hello4", "level2_s", "b", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "level1_s", "hello3", "level2_s", "b", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "level1_s", "hello3", "level2_s", "b", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
+  @Test
+  public void testSubFacetStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "level1_s", "hello0", "level2_s", "a", "a_i", "0", "a_f", "1")
+        .add(id, "2", "level1_s", "hello0", "level2_s", "a", "a_i", "2", "a_f", "2")
+        .add(id, "3", "level1_s", "hello3", "level2_s", "a", "a_i", "3", "a_f", "3")
+        .add(id, "4", "level1_s", "hello4", "level2_s", "a", "a_i", "4", "a_f", "4")
+        .add(id, "1", "level1_s", "hello0", "level2_s", "b", "a_i", "1", "a_f", "5")
+        .add(id, "5", "level1_s", "hello3", "level2_s", "b", "a_i", "10", "a_f", "6")
+        .add(id, "6", "level1_s", "hello4", "level2_s", "b", "a_i", "11", "a_f", "7")
+        .add(id, "7", "level1_s", "hello3", "level2_s", "b", "a_i", "12", "a_f", "8")
+        .add(id, "8", "level1_s", "hello3", "level2_s", "b", "a_i", "13", "a_f", "9")
+        .add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map paramsA = mapParams("q","*:*","fl","a_i,a_f");
 
@@ -1089,10 +958,9 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     FieldComparator[] sorts = {new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING), new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING)};
 
-
     FacetStream facetStream = new FacetStream(
         zkHost,
-        "collection1",
+        COLLECTION,
         paramsA,
         buckets,
         metrics,
@@ -1172,7 +1040,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     sorts[1] =  new FieldComparator("level2_s", ComparatorOrder.DESCENDING );
     facetStream = new FacetStream(
         zkHost,
-        "collection1",
+        COLLECTION,
         paramsA,
         buckets,
         metrics,
@@ -1248,29 +1116,26 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assertTrue(sumi.longValue() == 2);
     assertTrue(count.doubleValue() == 2);
 
-    del("*:*");
-    commit();
   }
 
-  private void testRollupStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
+  @Test
+  public void testRollupStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
 
     Bucket[] buckets =  {new Bucket("a_s")};
 
@@ -1365,11 +1230,12 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
 
     //Test will null value in the grouping field
-    indexr(id, "12", "a_s", null, "a_i", "14", "a_f", "10");
-    commit();
+    new UpdateRequest()
+        .add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc", "qt", "/export");
-    stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
 
     Bucket[] buckets1 =  {new Bucket("a_s")};
 
@@ -1410,15 +1276,10 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assertTrue(avgf.doubleValue() == 10.0D);
     assertTrue(count.doubleValue() == 1);
 
-
-    del("*:*");
-    commit();
   }
 
-
-  private void testDaemonTopicStream() throws Exception {
-
-    String zkHost = zkServer.getZkAddress();
+  @Test
+  public void testDaemonTopicStream() throws Exception {
 
     StreamContext context = new StreamContext();
     SolrClientCache cache = new SolrClientCache();
@@ -1429,7 +1290,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     params.put("rows", "500");
     params.put("fl", "id");
 
-    TopicStream topicStream = new TopicStream(zkHost, "collection1", "collection1", "50000000", 1000000, params);
+    TopicStream topicStream = new TopicStream(zkHost, COLLECTION, COLLECTION, "50000000", 1000000, params);
 
     DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
     daemonStream.setStreamContext(context);
@@ -1437,7 +1298,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     daemonStream.open();
 
     // Wait for the checkpoint
-    CloudJettyRunner jetty = this.cloudJettys.get(0);
+    JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
 
     Map params1 = new HashMap();
     params1.put("qt","/get");
@@ -1445,7 +1306,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     params1.put("fl","id");
     int count = 0;
     while(count == 0) {
-      SolrStream solrStream = new SolrStream(jetty.url, params1);
+      SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTION, params1);
       List<Tuple> tuples = getTuples(solrStream);
       count = tuples.size();
       if(count > 0) {
@@ -1456,24 +1317,22 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
       }
     }
 
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-
-    commit();
-
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     for(int i=0; i<5; i++) {
       daemonStream.read();
     }
 
-
-    indexr(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4");
-    indexr(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4");
-
-    commit();
+    new UpdateRequest()
+        .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
+        .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     for(int i=0; i<2; i++) {
       daemonStream.read();
@@ -1486,30 +1345,27 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assertTrue(tuple.EOF);
     daemonStream.close();
     cache.close();
-    del("*:*");
-    commit();
-  }
-
-  private void testParallelRollupStream() throws Exception {
 
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
-
-    commit();
+  }
 
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+  @Test
+  public void testParallelRollupStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
 
     Bucket[] buckets =  {new Bucket("a_s")};
 
@@ -1524,7 +1380,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
                         new CountMetric()};
 
     RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
-    ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+    ParallelStream parallelStream = new ParallelStream(zkHost, COLLECTION, rollupStream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
     attachStreamFactory(parallelStream);
     List<Tuple> tuples = getTuples(parallelStream);
 
@@ -1601,55 +1457,48 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assertTrue(avgf.doubleValue() == 5.5D);
     assertTrue(count.doubleValue() == 2);
 
-    del("*:*");
-    commit();
   }
 
-  private void testZeroParallelReducerStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
-    indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
-    indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
-    indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
-    indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
-    indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
-    indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+  @Test
+  public void testZeroParallelReducerStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
+        .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+        .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
+        .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
+        .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map paramsA = mapParams("q", "blah", "fl", "id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
     ReducerStream rstream = new ReducerStream(stream,
                                               new FieldEqualitor("a_s"),
                                               new GroupOperation(new FieldComparator("a_s", ComparatorOrder.ASCENDING), 2));
 
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
 
     attachStreamFactory(pstream);
     List<Tuple> tuples = getTuples(pstream);
     assert(tuples.size() == 0);
-    del("*:*");
-    commit();
-  }
-
 
-  private void testTuple() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "5.1", "s_multi", "a", "s_multi", "b", "i_multi", "1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3");
+  }
 
-    commit();
+  @Test
+  public void testTuple() throws Exception {
 
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "5.1", "s_multi", "a", "s_multi", "b", "i_multi",
+                 "1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f,s_multi,i_multi,f_multi","sort", "a_s asc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
     List<Tuple> tuples = getTuples(stream);
     Tuple tuple = tuples.get(0);
 
@@ -1675,29 +1524,25 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assert(doubleList.get(0).doubleValue() == 1.2);
     assert(doubleList.get(1).doubleValue() == 1.3);
 
-    del("*:*");
-    commit();
   }
 
-  private void testMergeStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-
-    commit();
+  @Test
+  public void testMergeStream() throws Exception {
 
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test ascending
     Map paramsA = mapParams("q","id:(4 1)","fl","id,a_s,a_i","sort", "a_i asc");
-    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+    CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
 
     Map paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i asc");
-    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+    CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
 
     MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
     List<Tuple> tuples = getTuples(mstream);
@@ -1707,10 +1552,10 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     //Test descending
     paramsA = mapParams("q","id:(4 1)","fl","id,a_s,a_i","sort", "a_i desc");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+    streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
 
     paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i desc");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+    streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
 
     mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     tuples = getTuples(mstream);
@@ -1721,10 +1566,10 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     //Test compound sort
 
     paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+    streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
 
     paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+    streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
 
     mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.ASCENDING)));
     tuples = getTuples(mstream);
@@ -1733,10 +1578,10 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assertOrder(tuples, 0,2,1,3,4);
 
     paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+    streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
 
     paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+    streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
 
     mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
     tuples = getTuples(mstream);
@@ -1744,38 +1589,33 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     assert(tuples.size() == 5);
     assertOrder(tuples, 2,0,1,3,4);
 
-    del("*:*");
-    commit();
   }
 
-
-  private void testParallelMergeStream() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0");
-    indexr(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0");
-    indexr(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3");
-    indexr(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4");
-    indexr(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+  @Test
+  public void testParallelMergeStream() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .add(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0")
+        .add(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0")
+        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3")
+        .add(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4")
+        .add(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test ascending
     Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
-    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+    CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
 
     Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
-    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+    CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
 
     MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
     attachStreamFactory(pstream);
     List<Tuple> tuples = getTuples(pstream);
 
@@ -1784,50 +1624,46 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     //Test descending
     paramsA = mapParams("q", "id:(4 1 8 9)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
-    streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+    streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
 
     paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
-    streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+    streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
 
     mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
-    pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+    pstream = new ParallelStream(zkHost, COLLECTION, mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
     attachStreamFactory(pstream);
     tuples = getTuples(pstream);
 
     assert(tuples.size() == 8);
     assertOrder(tuples, 9,8,6,4,3,2,1,0);
 
-    del("*:*");
-    commit();
   }
 
-  private void testParallelEOF() throws Exception {
-
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-    indexr(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0");
-    indexr(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0");
-    indexr(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3");
-    indexr(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4");
-    indexr(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
+  @Test
+  public void testParallelEOF() throws Exception {
+
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .add(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0")
+        .add(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0")
+        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3")
+        .add(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4")
+        .add(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
     //Test ascending
     Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
-    CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
+    CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
 
     Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
-    CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
+    CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
 
     MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
-    ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+    ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
     
     attachStreamFactory(pstream);
     List<Tuple> tuples = getTuples(pstream);
@@ -1836,41 +1672,25 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
     Map<String, Tuple> eofTuples = pstream.getEofTuples();
     assert(eofTuples.size() == 2); // There should be an EOF Tuple for each worker.
 
-    del("*:*");
-    commit();
   }
 
-
-
   @Test
   public void streamTests() throws Exception {
-    assertNotNull(cloudClient);
-
-    handle.clear();
-    handle.put("timestamp", SKIPVAL);
-
-    waitForRecoveriesToFinish(false);
-
-    del("*:*");
 
-    commit();
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
+        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
+        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
+        .commit(cluster.getSolrClient(), COLLECTION);
 
-    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
-    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
-    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
-    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
-    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
-
-    commit();
-
-    String zkHost = zkServer.getZkAddress();
-    streamFactory.withCollectionZkHost("collection1", zkHost);
     Map params = null;
 
     //Basic CloudSolrStream Test with Descending Sort
 
     params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i desc");
-    CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
+    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
     List<Tuple> tuples = getTuples(stream);
 
     assert(tuples.size() == 5);
@@ -1878,7 +1698,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     //With Ascending Sort
     params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
-    stream = new CloudSolrStream(zkHost, "collection1", params);
+    stream = new CloudSolrStream(zkHost, COLLECTION, params);
     tuples = getTuples(stream);
 
     assert(tuples.size() == 5);
@@ -1887,7 +1707,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     //Test compound sort
     params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
-    stream = new CloudSolrStream(zkHost, "collection1", params);
+    stream = new CloudSolrStream(zkHost, COLLECTION, params);
     tuples = getTuples(stream);
 
     assert(tuples.size() == 5);
@@ -1895,38 +1715,12 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
 
     params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
-    stream = new CloudSolrStream(zkHost, "collection1", params);
+    stream = new CloudSolrStream(zkHost, COLLECTION, params);
     tuples = getTuples(stream);
 
     assert (tuples.size() == 5);
     assertOrder(tuples, 0, 2, 1, 3, 4);
 
-    del("*:*");
-    commit();
-
-    testTuple();
-    testSpacesInParams();
-    testNonePartitionKeys();
-    testTrace();
-    testUniqueStream();
-    testRankStream();
-    testMergeStream();
-    testReducerStream();
-    testRollupStream();
-    testZeroReducerStream();
-    testFacetStream();
-    testSubFacetStream();
-    testStatsStream();
-    //testExceptionStream();
-    testDaemonTopicStream();
-    testParallelEOF();
-    testParallelUniqueStream();
-    testParallelRankStream();
-    testParallelMergeStream();
-    testParallelRollupStream();
-    testParallelReducerStream();
-    //testParallelExceptionStream();
-    testZeroParallelReducerStream();
   }
 
   protected Map mapParams(String... vals) {
@@ -2019,12 +1813,6 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
 
     return true;
   }
-
-  @Override
-  protected void indexr(Object... fields) throws Exception {
-    SolrInputDocument doc = getDoc(fields);
-    indexDoc(doc);
-  }
   
   private void attachStreamFactory(TupleStream tupleStream) {
     StreamContext streamContext = new StreamContext();


Mime
View raw message