hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (Jira)" <j...@apache.org>
Subject [jira] [Work logged] (HIVE-20683) Add the Ability to push Dynamic Between and Bloom filters to Druid
Date Mon, 09 Sep 2019 13:58:00 GMT

     [ https://issues.apache.org/jira/browse/HIVE-20683?focusedWorklogId=308883&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308883
]

ASF GitHub Bot logged work on HIVE-20683:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Sep/19 13:57
            Start Date: 09/Sep/19 13:57
    Worklog Time Spent: 10m 
      Work Description: nishantmonu51 commented on pull request #723: [HIVE-20683] Add the
Ability to push Dynamic Between and Bloom filters to Druid
URL: https://github.com/apache/hive/pull/723#discussion_r322256584
 
 

 ##########
 File path: druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
 ##########
 @@ -894,4 +945,255 @@ public static IndexSpec getIndexSpec(Configuration jc) {
     ImmutableList<AggregatorFactory> aggregatorFactories = aggregatorFactoryBuilder.build();
     return Pair.of(dimensions, aggregatorFactories.toArray(new AggregatorFactory[0]));
   }
+
+  // Druid only supports String,Long,Float,Double selectors
+  private static Set<TypeInfo> druidSupportedTypeInfos = ImmutableSet.<TypeInfo>of(
+      TypeInfoFactory.stringTypeInfo, TypeInfoFactory.charTypeInfo,
+      TypeInfoFactory.varcharTypeInfo, TypeInfoFactory.byteTypeInfo,
+      TypeInfoFactory.intTypeInfo, TypeInfoFactory.longTypeInfo,
+      TypeInfoFactory.shortTypeInfo, TypeInfoFactory.doubleTypeInfo
+  );
+
+  private static Set<TypeInfo> stringTypeInfos = ImmutableSet.<TypeInfo>of(
+      TypeInfoFactory.stringTypeInfo,
+      TypeInfoFactory.charTypeInfo, TypeInfoFactory.varcharTypeInfo
+  );
+
+
+  public static org.apache.druid.query.Query addDynamicFilters(org.apache.druid.query.Query
query,
+      ExprNodeGenericFuncDesc filterExpr, Configuration conf, boolean resolveDynamicValues
+  ) {
+    List<VirtualColumn> virtualColumns = Arrays
+        .asList(getVirtualColumns(query).getVirtualColumns());
+    org.apache.druid.query.Query rv = query;
+    DimFilter joinReductionFilter = toDruidFilter(filterExpr, conf, virtualColumns,
+        resolveDynamicValues
+    );
+    if(joinReductionFilter != null) {
+      String type = query.getType();
+      DimFilter filter = new AndDimFilter(joinReductionFilter, query.getFilter());
+      switch (type) {
+      case org.apache.druid.query.Query.TIMESERIES:
+        rv = Druids.TimeseriesQueryBuilder.copy((TimeseriesQuery) query)
+            .filters(filter)
+            .virtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      case org.apache.druid.query.Query.TOPN:
+        rv = new TopNQueryBuilder((TopNQuery) query)
+            .filters(filter)
+            .virtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      case org.apache.druid.query.Query.GROUP_BY:
+        rv = new GroupByQuery.Builder((GroupByQuery) query)
+            .setDimFilter(filter)
+            .setVirtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      case org.apache.druid.query.Query.SCAN:
+        rv = ScanQuery.ScanQueryBuilder.copy((ScanQuery) query)
+            .filters(filter)
+            .virtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      case org.apache.druid.query.Query.SELECT:
+        rv = Druids.SelectQueryBuilder.copy((SelectQuery) query)
+            .filters(filter)
+            .virtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported Query type " + type);
+      }
+    }
+    return rv;
+  }
+
+  @Nullable
+  private static DimFilter toDruidFilter(ExprNodeDesc filterExpr, Configuration configuration,
+      List<VirtualColumn> virtualColumns, boolean resolveDynamicValues
+  ) {
+    if(filterExpr == null) {
+      return null;
+    }
+    Class<? extends GenericUDF> genericUDFClass = getGenericUDFClassFromExprDesc(filterExpr);
+    if(FunctionRegistry.isOpAnd(filterExpr)) {
+      Iterator<ExprNodeDesc> iterator = filterExpr.getChildren().iterator();
+      List<DimFilter> delegates = Lists.newArrayList();
+      while (iterator.hasNext()) {
+        DimFilter filter = toDruidFilter(iterator.next(), configuration, virtualColumns,
+            resolveDynamicValues
+        );
+        if(filter != null) {
+          delegates.add(filter);
+        }
+      }
+      if(delegates != null && !delegates.isEmpty()) {
+        return new AndDimFilter(delegates);
+      }
+    }
+    if(FunctionRegistry.isOpOr(filterExpr)) {
+      Iterator<ExprNodeDesc> iterator = filterExpr.getChildren().iterator();
+      List<DimFilter> delegates = Lists.newArrayList();
+      while (iterator.hasNext()) {
+        DimFilter filter = toDruidFilter(iterator.next(), configuration, virtualColumns,
+            resolveDynamicValues
+        );
+        if(filter != null) {
+          delegates.add(filter);
+        }
+      }
+      if(delegates != null) {
+        return new OrDimFilter(delegates);
+      }
+    } else if(GenericUDFBetween.class == genericUDFClass) {
+      List<ExprNodeDesc> child = filterExpr.getChildren();
+      String col = extractColName(child.get(1), virtualColumns);
+      if(col != null) {
+        try {
+          StringComparator comparator = stringTypeInfos.contains(child.get(1).getTypeInfo())
+              ? StringComparators.LEXICOGRAPHIC
+              : StringComparators.NUMERIC;
+          String lower = evaluate(child.get(2), configuration, resolveDynamicValues);
+          String upper = evaluate(child.get(3), configuration, resolveDynamicValues);
+          return new BoundDimFilter(col, lower, upper, false, false, null, null,
+              comparator
+          );
+
+        } catch (HiveException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    } else if(GenericUDFInBloomFilter.class == genericUDFClass) {
+      List<ExprNodeDesc> child = filterExpr.getChildren();
+      String col = extractColName(child.get(0), virtualColumns);
+      if(col != null) {
+        try {
+          BloomKFilter bloomFilter = evaluateBloomFilter(child.get(1), configuration,
+              resolveDynamicValues
+          );
+          return new BloomDimFilter(col, BloomKFilterHolder.fromBloomKFilter(bloomFilter),
null);
+        } catch (HiveException e) {
+          throw new RuntimeException(e);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    return null;
+  }
+
+  private static String evaluate(ExprNodeDesc desc, Configuration configuration,
+      boolean resolveDynamicValue
+  ) throws HiveException {
+    ExprNodeEvaluator exprNodeEvaluator = ExprNodeEvaluatorFactory.get(desc, configuration);
+    if(exprNodeEvaluator instanceof ExprNodeDynamicValueEvaluator && !resolveDynamicValue)
{
+      return desc.getExprStringForExplain();
+    } else {
+      return exprNodeEvaluator.evaluate(null).toString();
+    }
+  }
+
+  private static BloomKFilter evaluateBloomFilter(ExprNodeDesc desc, Configuration configuration,
+      boolean resolveDynamicValue
+  )
+      throws HiveException, IOException {
+    if(!resolveDynamicValue) {
+      // return a dummy bloom filter for explain
+      return new BloomKFilter(1);
+    } else {
+      BytesWritable bw = (BytesWritable) ExprNodeEvaluatorFactory.get(desc, configuration)
+              .evaluate(null);
+      return BloomKFilter.deserialize(ByteBuffer.wrap(bw.getBytes()));
+    }
+  }
+
+  public static String extractColName(ExprNodeDesc expr, List<VirtualColumn> virtualColumns)
{
+    if(!druidSupportedTypeInfos.contains(expr.getTypeInfo())) {
+      // This column type is currently not supported in druid.(e.g boolean)
+      // We cannot pass the bloom filter to druid since bloom filter tests for exact object
bytes.
+      return null;
+    }
+    if(expr instanceof ExprNodeColumnDesc) {
 
 Review comment:
   formatted
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 308883)
    Time Spent: 5h  (was: 4h 50m)

> Add the Ability to push Dynamic Between and Bloom filters to Druid
> ------------------------------------------------------------------
>
>                 Key: HIVE-20683
>                 URL: https://issues.apache.org/jira/browse/HIVE-20683
>             Project: Hive
>          Issue Type: New Feature
>          Components: Druid integration
>            Reporter: Nishant Bangarwa
>            Assignee: Nishant Bangarwa
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-20683.1.patch, HIVE-20683.10.patch, HIVE-20683.2.patch, HIVE-20683.3.patch,
HIVE-20683.4.patch, HIVE-20683.5.patch, HIVE-20683.6.patch, HIVE-20683.8.patch, HIVE-20683.patch
>
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> For optimizing joins, Hive generates BETWEEN filter with min-max and BLOOM filter for
filtering one side of semi-join.
> Druid 0.13.0 will have support for Bloom filters (Added via https://github.com/apache/incubator-druid/pull/6222)
> Implementation details - 
> # Hive generates and passes the filters as part of 'filterExpr' in TableScan. 
> # DruidQueryBasedRecordReader gets this filter passed as part of the conf. 
> # During execution phase, before sending the query to druid in DruidQueryBasedRecordReader
we will deserialize this filter, translate it into a DruidDimFilter and add it to existing
DruidQuery.  Tez executor already ensures that when we start reading results from the record
reader, all the dynamic values are initialized. 
> # Explaining a druid query also prints the query sent to druid as {{druid.json.query}}.
We also need to make sure to update the druid query with the filters. During explain we do
not have the actual values for the dynamic values, so instead of values we will print the
dynamic expression itself as part of druid query. 
> Note:- This work needs druid to be updated to version 0.13.0



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Mime
View raw message