lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kris...@apache.org
Subject [2/2] lucene-solr:jira/solr-8593: Improve group by aggregates
Date Thu, 05 May 2016 15:25:07 GMT
Improve group by aggregates


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/3b28ec05
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/3b28ec05
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/3b28ec05

Branch: refs/heads/jira/solr-8593
Commit: 3b28ec056faebc4b04f9dddc9f70d68c692901d4
Parents: 983ebba
Author: Kevin Risden <krisden@apache.org>
Authored: Thu May 5 10:24:57 2016 -0500
Committer: Kevin Risden <krisden@apache.org>
Committed: Thu May 5 10:24:57 2016 -0500

----------------------------------------------------------------------
 .../org/apache/solr/handler/sql/SolrTable.java  | 132 ++++++++++++++++++-
 1 file changed, 128 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3b28ec05/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
index 753e9f8..2f82ae2 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
@@ -28,10 +28,12 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTableQueryable;
 import org.apache.calcite.util.Pair;
-import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
-import org.apache.solr.client.solrj.io.stream.RollupStream;
-import org.apache.solr.client.solrj.io.stream.StatsStream;
-import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
+import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.*;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.client.solrj.io.stream.metrics.*;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.update.VersionInfo;
@@ -105,6 +107,14 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable
{
       }
 
       for(Metric metric : metrics) {
+        List<String> newOrderList = new ArrayList<>();
+        for(String orderItem : orderList) {
+          if(!orderItem.startsWith(metric.getIdentifier())) {
+            newOrderList.add(orderItem);
+          }
+        }
+        orderList = newOrderList;
+
         for(String column : metric.getColumns()) {
           if (!fieldsList.contains(column)) {
             fieldsList.add(column);
@@ -169,6 +179,47 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable
{
           solrParams.put(CommonParams.FL, String.join(",", fieldsList));
           tupleStream = new CloudSolrStream(zk, collection, solrParams);
           tupleStream = new RollupStream(tupleStream, bucketsList.toArray(new Bucket[bucketsList.size()]),
metricsArray);
+
+          String sortDirection = getSortDirection(orderList);
+
+          int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
+          if(numWorkers > 1) {
+            String workerZkHost = properties.getProperty("workerZkhost");
+            String workerCollection = properties.getProperty("workerCollection");
+            // Do the rollups in parallel
+            // Maintain the sort of the Tuples coming from the workers.
+            StreamComparator comp = bucketSortComp(bucketsList, sortDirection);
+            ParallelStream parallelStream = new ParallelStream(workerZkHost, workerCollection,
tupleStream, numWorkers, comp);
+
+            StreamFactory factory = new StreamFactory()
+                .withFunctionName("search", CloudSolrStream.class)
+                .withFunctionName("parallel", ParallelStream.class)
+                .withFunctionName("rollup", RollupStream.class)
+                .withFunctionName("sum", SumMetric.class)
+                .withFunctionName("min", MinMetric.class)
+                .withFunctionName("max", MaxMetric.class)
+                .withFunctionName("avg", MeanMetric.class)
+                .withFunctionName("count", CountMetric.class);
+
+            parallelStream.setStreamFactory(factory);
+            tupleStream = parallelStream;
+          }
+
+          if (!sortsEqual(bucketsList, sortDirection, orderList)) {
+            int limitVal = limit == null ? 100 : Integer.parseInt(limit);
+            StreamComparator comp = getComp(orderList);
+            //Rank the Tuples
+            //If parallel stream is used ALL the Rolled up tuples from the workers will be
ranked
+            //Providing a true Top or Bottom.
+            tupleStream = new RankStream(tupleStream, limitVal, comp);
+          } else {
+            // Sort is the same as the same as the underlying stream
+            // Only need to limit the result, not Rank the result
+            if (limit != null) {
+              solrParams.put(CommonParams.ROWS, limit);
+              tupleStream = new LimitStream(new CloudSolrStream(zk, collection, solrParams),
Integer.parseInt(limit));
+            }
+          }
         }
       }
     } catch (IOException e) {
@@ -185,6 +236,79 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable
{
     };
   }
 
+  private static StreamComparator bucketSortComp(List<Bucket> buckets, String dir)
{
+    FieldComparator[] comps = new FieldComparator[buckets.size()];
+    for(int i=0; i<buckets.size(); i++) {
+      ComparatorOrder comparatorOrder = ComparatorOrder.fromString(dir);
+      String sortKey = buckets.get(i).toString();
+      comps[i] = new FieldComparator(sortKey, comparatorOrder);
+    }
+
+    if(comps.length == 1) {
+      return comps[0];
+    } else {
+      return new MultipleFieldComparator(comps);
+    }
+  }
+
+  private boolean sortsEqual(List<Bucket> buckets, String direction, List<String>
orderList) {
+    if(buckets.size() != orderList.size()) {
+      return false;
+    }
+
+    for(int i=0; i< buckets.size(); i++) {
+      Bucket bucket = buckets.get(i);
+      String orderItem = orderList.get(i);
+      if(!bucket.toString().equals(getSortField(orderItem))) {
+        return false;
+      }
+
+
+      if(!getSortDirection(orderItem).equalsIgnoreCase(direction)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+
+  private String getSortDirection(List<String> orderList) {
+    for(String orderItem : orderList) {
+      return getSortDirection(orderItem);
+    }
+
+    return "asc";
+  }
+
+  private String getSortField(String orderItem) {
+    String[] orderParts = orderItem.split(" ", 2);
+    return orderParts[0];
+  }
+
+  private String getSortDirection(String orderItem) {
+    String[] orderParts = orderItem.split(" ", 2);
+    String direction = orderParts[1];
+    return direction == null ? "asc" : direction;
+  }
+
+  private StreamComparator getComp(List<String> orderList) {
+    FieldComparator[] comps = new FieldComparator[orderList.size()];
+    for(int i = 0; i < orderList.size(); i++) {
+      String orderItem = orderList.get(i);
+      String direction = getSortDirection(orderItem);
+      ComparatorOrder comparatorOrder = ComparatorOrder.fromString(direction);
+      String sortKey = getSortField(orderItem);
+      comps[i] = new FieldComparator(sortKey, comparatorOrder);
+    }
+
+    if(comps.length == 1) {
+      return comps[0];
+    } else {
+      return new MultipleFieldComparator(comps);
+    }
+  }
+
   private List<Metric> buildMetrics(List<Pair<String, String>> metricPairs)
{
     List<Metric> metrics = new ArrayList<>(metricPairs.size());
     for(Pair<String, String> metricPair : metricPairs) {


Mime
View raw message