lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbern...@apache.org
Subject [02/11] lucene-solr:branch_6x: SOLR-8996: Add Random Streaming Expression
Date Mon, 02 May 2016 13:29:33 GMT
SOLR-8996: Add Random Streaming Expression


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/60a9e386
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/60a9e386
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/60a9e386

Branch: refs/heads/branch_6x
Commit: 60a9e386687d04fc4163731897b5ba9d6f5dd8c0
Parents: d77574a
Author: jbernste <jbernste@apache.org>
Authored: Mon Apr 18 16:02:07 2016 -0400
Committer: jbernste <jbernste@apache.org>
Committed: Mon May 2 08:29:29 2016 -0400

----------------------------------------------------------------------
 .../client/solrj/io/stream/RandomStream.java    | 211 +++++++++++++++++++
 .../solr/collection1/conf/schema-streaming.xml  |   5 +
 .../solrj/io/stream/StreamExpressionTest.java   |  72 +++++++
 3 files changed, 288 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/60a9e386/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
new file mode 100644
index 0000000..3eb35c1
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Iterator;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+/**
+ *  The RandomStream emits a stream of psuedo random Tuples that match the query parameters.
Sample expression syntax:
+ *  random(collection, q="Hello word", rows="50", fl="title, body")
+ **/
+
+public class RandomStream extends TupleStream implements Expressible  {
+
+  private String zkHost;
+  private Map<String, String> props;
+  private String collection;
+  protected transient SolrClientCache cache;
+  protected transient CloudSolrClient cloudSolrClient;
+  private Iterator<SolrDocument> documentIterator;
+
+  public RandomStream(String zkHost,
+                      String collection,
+                     Map<String, String> props) throws IOException {
+    init(zkHost, collection, props);
+  }
+
+  public RandomStream(StreamExpression expression, StreamFactory factory) throws IOException{
+    // grab all parameters out
+    String collectionName = factory.getValueOperand(expression, 0);
+    List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
+    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression,
"zkHost");
+
+
+    // Collection Name
+    if(null == collectionName){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName
expected as first operand",expression));
+    }
+
+    // Named parameters - passed directly to solr as solrparams
+    if(0 == namedParams.size()){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one
named parameter expected. eg. 'q=*:*'",expression));
+    }
+
+    // pull out known named params
+    Map<String,String> params = new HashMap<String,String>();
+    for(StreamExpressionNamedParameter namedParam : namedParams){
+      if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("buckets")
&& !namedParam.getName().equals("bucketSorts") && !namedParam.getName().equals("limit")){
+        params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
+      }
+    }
+
+    // zkHost, optional - if not provided then will look into factory list to get
+    String zkHost = null;
+    if(null == zkHostExpression){
+      zkHost = factory.getCollectionZkHost(collectionName);
+      if(zkHost == null) {
+        zkHost = factory.getDefaultZkHost();
+      }
+    }
+    else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
+      zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
+    }
+    if(null == zkHost){
+      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not
found for collection '%s'",expression,collectionName));
+    }
+
+    // We've got all the required items
+    init(zkHost, collectionName, params);
+  }
+
+  private void init(String zkHost, String collection, Map<String, String> props) throws
IOException {
+    this.zkHost  = zkHost;
+    this.props   = props;
+    this.collection = collection;
+  }
+
+  @Override
+  public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException
{
+    // function name
+    StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
+
+    // collection
+    expression.addParameter(collection);
+
+    // parameters
+    for(Entry<String,String> param : props.entrySet()){
+      expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue()));
+    }
+
+    // zkHost
+    expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
+
+    return expression;
+  }
+
+
+  public void setStreamContext(StreamContext context) {
+    cache = context.getSolrClientCache();
+  }
+
+  public List<TupleStream> children() {
+    List<TupleStream> l =  new ArrayList();
+    return l;
+  }
+
+  public void open() throws IOException {
+    if(cache != null) {
+      cloudSolrClient = cache.getCloudSolrClient(zkHost);
+    } else {
+      cloudSolrClient = new CloudSolrClient(zkHost);
+    }
+
+    ModifiableSolrParams params = getParams(this.props);
+
+    params.remove("sort"); //Override any sort.
+
+    Random rand = new Random();
+    int seed = rand.nextInt();
+
+    String sortField = "random_"+seed;
+    params.add("sort", sortField+" asc");
+
+    QueryRequest request = new QueryRequest(params);
+    try {
+      QueryResponse response = request.process(cloudSolrClient, collection);
+      SolrDocumentList docs = response.getResults();
+      documentIterator = docs.iterator();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void close() throws IOException {
+    if(cache == null) {
+      cloudSolrClient.close();
+    }
+  }
+
+  public Tuple read() throws IOException {
+    if(documentIterator.hasNext()) {
+      Map map = new HashMap();
+      SolrDocument doc = documentIterator.next();
+      for(String key  : doc.keySet()) {
+        map.put(key, doc.get(key));
+      }
+      return new Tuple(map);
+    } else {
+      Map fields = new HashMap();
+      fields.put("EOF", true);
+      Tuple tuple = new Tuple(fields);
+      return tuple;
+    }
+  }
+
+  private ModifiableSolrParams getParams(Map<String, String> props) {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    for(String key : props.keySet()) {
+      String value = props.get(key);
+      params.add(key, value);
+    }
+    return params;
+  }
+
+  public int getCost() {
+    return 0;
+  }
+
+  @Override
+  public StreamComparator getStreamSort() {
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/60a9e386/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml b/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml
index 7a7ee52..575b622 100644
--- a/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml
+++ b/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml
@@ -51,6 +51,8 @@
     <fieldType name="tlong" class="solr.TrieLongField" precisionStep="8" omitNorms="true"
positionIncrementGap="0"/>
     <fieldType name="tdouble" class="solr.TrieDoubleField" precisionStep="8" omitNorms="true"
positionIncrementGap="0"/>
 
+    <fieldType name="random" class="solr.RandomSortField" indexed="true" />
+
     <!-- numeric field types that manipulate the value into
        a string value that isn't human readable in it's internal form,
        but sorts correctly and supports range queries.
@@ -564,6 +566,9 @@
 
     <dynamicField name="*_mfacet" type="string" indexed="true" stored="false" multiValued="true"
/>
 
+    <dynamicField name="random_*" type="random" />
+
+
     <!-- make sure custom sims work with dynamic fields -->
     <!--
     <dynamicField name="*_sim1" type="sim1" indexed="true" stored="true"/>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/60a9e386/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 9ae6761..868afd5 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Collections;
 
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.Slow;
@@ -140,6 +141,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase
{
     testNulls();
     testTopicStream();
     testDaemonStream();
+    testRandomStream();
     testParallelUniqueStream();
     testParallelReducerStream();
     testParallelRankStream();
@@ -557,6 +559,76 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase
{
     del("*:*");
     commit();
   }
+
+  private void testRandomStream() throws Exception {
+
+    indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
+    indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
+    indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
+    indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
+    indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
+    commit();
+
+    StreamExpression expression;
+    TupleStream stream;
+
+    StreamFactory factory = new StreamFactory()
+        .withCollectionZkHost("collection1", zkServer.getZkAddress())
+        .withFunctionName("random", RandomStream.class);
+
+
+    StreamContext context = new StreamContext();
+    SolrClientCache cache = new SolrClientCache();
+    try {
+      context.setSolrClientCache(cache);
+
+      expression = StreamExpressionParser.parse("random(collection1, q=\"*:*\", rows=\"10\",
fl=\"id, a_i\")");
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(context);
+      List<Tuple> tuples1 = getTuples(stream);
+      assert (tuples1.size() == 5);
+
+      expression = StreamExpressionParser.parse("random(collection1, q=\"*:*\", rows=\"10\",
fl=\"id, a_i\")");
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(context);
+      List<Tuple> tuples2 = getTuples(stream);
+      assert (tuples2.size() == 5);
+
+      boolean different = false;
+      for (int i = 0; i < tuples1.size(); i++) {
+        Tuple tuple1 = tuples1.get(i);
+        Tuple tuple2 = tuples2.get(i);
+        if (!tuple1.get("id").equals(tuple2.get(id))) {
+          different = true;
+          break;
+        }
+      }
+
+      assertTrue(different);
+
+      Collections.sort(tuples1, new FieldComparator("id", ComparatorOrder.ASCENDING));
+      Collections.sort(tuples2, new FieldComparator("id", ComparatorOrder.ASCENDING));
+
+      for (int i = 0; i < tuples1.size(); i++) {
+        Tuple tuple1 = tuples1.get(i);
+        Tuple tuple2 = tuples2.get(i);
+        if (!tuple1.get("id").equals(tuple2.get(id))) {
+          assert(tuple1.getLong("id").equals(tuple2.get("a_i")));
+        }
+      }
+
+      expression = StreamExpressionParser.parse("random(collection1, q=\"*:*\", rows=\"1\",
fl=\"id, a_i\")");
+      stream = factory.constructStream(expression);
+      stream.setStreamContext(context);
+      List<Tuple> tuples3 = getTuples(stream);
+      assert (tuples3.size() == 1);
+
+    } finally {
+      cache.close();
+      del("*:*");
+      commit();
+    }
+  }
   
   private void testReducerStream() throws Exception{
     indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");


Mime
View raw message