spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: 回复: Spark 1.6.0 + Hive + HBase
Date Thu, 28 Jan 2016 15:35:33 GMT
Under sql/hive/src/main/scala/org/apache/spark/sql/hive/execution , I only
see HiveTableScan and HiveNativeCommand
At the beginning of HiveTableScan :

 * The Hive table scan operator.  Column and partition pruning are both
handled.

Looks like filter pushdown hasn't been implemented.

As far as I know, Huawei's Astro can do filter pushdown :

 http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase

On Thu, Jan 28, 2016 at 5:20 AM, 开心延年 <muyannian@qq.com> wrote:

> This not hive`s bug .I test hive on my storage is ok.
>
> but when i test it on spark-sql is not pass
> TableScanDesc.FILTER_EXPR_CONF_STR params;
>
> so that is the reason cause the full scan.
>
> the source code in HiveHBaseTableInputFormat is as follows,that is the
> reason caused full scan.
>
>
>  private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp,
> boolean isKeyBinary)
>       throws IOException {
>
>     // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL
>
>     Scan scan = new Scan();
>     String filterObjectSerialized =
> jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR);
>     if (filterObjectSerialized != null) {
>       HBaseScanRange range =
> Utilities.deserializeObject(filterObjectSerialized,
>           HBaseScanRange.class);
>       try {
>         range.setup(scan, jobConf);
>       } catch (Exception e) {
>         throw new IOException(e);
>       }
>       return scan;
>     }
>
>     String filterExprSerialized =
> jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
>     if (filterExprSerialized == null) {
>       return scan;
>     }
>
>     ExprNodeGenericFuncDesc filterExpr =
>       Utilities.deserializeExpression(filterExprSerialized);
>
>     String keyColName =
> jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey];
>     String colType =
> jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey];
>     boolean isKeyComparable = isKeyBinary ||
> colType.equalsIgnoreCase("string");
>
>     String tsColName = null;
>     if (iTimestamp >= 0) {
>       tsColName =
> jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp];
>     }
>
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Jörn Franke";<jornfranke@gmail.com>;
> *发送时间:* 2016年1月28日(星期四) 晚上9:09
> *收件人:* "开心延年"<muyannian@qq.com>;
> *抄送:* "Julio Antonio Soto de Vicente"<julio@esbet.es>; "Maciej Bryński"<
> maciek@brynski.pl>; "Ted Yu"<yuzhihong@gmail.com>; "dev"<
> dev@spark.apache.org>;
> *主题:* Re: 回复: Spark 1.6.0 + Hive + HBase
>
> Probably a newer Hive version makes a lot of sense here - at least 1.2.1.
> What storage format are you using?
> I think the old Hive version had a bug where it always scanned all
> partitions unless you limit it in the on clause of the query to a certain
> partition (eg on date=20201119)
>
> On 28 Jan 2016, at 13:42, 开心延年 <muyannian@qq.com> wrote:
>
>
> Is there any body can solve Problem 4)? thanks.
> Problem 4)
> Spark don't push down predicates for HiveTableScan, which means that every
> query is full scan.
>
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Julio Antonio Soto de Vicente";<julio@esbet.es>;
> *发送时间:* 2016年1月28日(星期四) 晚上8:09
> *收件人:* "Maciej Bryński"<maciek@brynski.pl>;
> *抄送:* "Ted Yu"<yuzhihong@gmail.com>; "dev"<dev@spark.apache.org>;
> *主题:* Re: Spark 1.6.0 + Hive + HBase
>
> Hi,
>
> Indeed, Hive is not able to perform predicate pushdown through a HBase
> table. Nor Hive or Impala can.
>
> Broadly speaking, if you need to query your  HBase table through a field
> other than de rowkey:
>
> A) Try to "encode" as much info as possible in the rowkey field and use it
> as your predicate, or
> B) Feel free to use other kind of storage system/create coprocessors in
> order to create a secondary index.
>
>
> El 28 ene 2016, a las 12:56, Maciej Bryński <maciek@brynski.pl> escribió:
>
> Ted,
> You're right.
> hbase-site.xml resolved problems 2 and 3, but...
>
> Problem 4)
> Spark don't push down predicates for HiveTableScan, which means that every
> query is full scan.
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#144L])
> +- TungstenExchange SinglePartition, None
>    +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)],
output=[count#147L])
>       +- Project
>          +- Filter (added_date#141L >= 201601280000)
>             +- HiveTableScan [added_date#141L], MetastoreRelation dwh_diagnostics, sessions_hbase,
None
>
>
> Is there any magic option to make this work ?
>
> Regards,
> Maciek
>
> 2016-01-28 10:25 GMT+01:00 Ted Yu <yuzhihong@gmail.com>:
>
>> For the last two problems, hbase-site.xml seems not to be on classpath.
>>
>> Once hbase-site.xml is put on classpath, you should be able to make
>> progress.
>>
>> Cheers
>>
>> On Jan 28, 2016, at 1:14 AM, Maciej Bryński <maciek@brynski.pl> wrote:
>>
>> Hi,
>> I'm trying to run SQL query on Hive table which is stored on HBase.
>> I'm using:
>> - Spark 1.6.0
>> - HDP 2.2
>> - Hive 0.14.0
>> - HBase 0.98.4
>>
>> I managed to configure working classpath, but I have following problems:
>>
>> 1) I have UDF defined in Hive Metastore (FUNCS table).
>> Spark cannot use it..
>>
>>  File "/opt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line
>> 308, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling o51.sql.
>> : org.apache.spark.sql.AnalysisException: undefined function
>> dwh.str_to_map_int_str; line 55 pos 30
>>         at
>> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:69)
>>         at
>> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:69)
>>         at scala.Option.getOrElse(Option.scala:120)
>>         at
>> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2.apply(hiveUDFs.scala:68)
>>         at
>> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2.apply(hiveUDFs.scala:64)
>>         at scala.util.Try.getOrElse(Try.scala:77)
>>         at
>> org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUDFs.scala:64)
>>         at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:574)
>>         at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:574)
>>         at
>> org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
>>         at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:573)
>>         at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:570)
>>
>>
>> 2) When I'm using SQL without this function Spark tries to connect to
>> Zookeeper on localhost.
>> I make a tunnel from localhost to one of the zookeeper servers but it's
>> not a solution.
>>
>> 16/01/28 10:09:18 INFO ZooKeeper: Client
>> environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
>> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:host.name
>> =j4.jupyter1
>> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:java.version=1.8.0_66
>> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:java.vendor=Oracle
>> Corporation
>> 16/01/28 10:09:18 INFO ZooKeeper: Client
>> environment:java.home=/usr/lib/jvm/java-8-oracle/jre
>> 16/01/28 10:09:18 INFO ZooKeeper: Client
>> environment:java.class.path=/opt/spark/lib/mysql-connector-java-5.1.35-bin.jar:/opt/spark/lib/dwh-hbase-connector.jar:/opt/spark/lib/hive-hbase-handler-1.2.1.spark.jar:/opt/spark/lib/hbase-server.jar:/opt/spark/lib/hbase-common.jar:/opt/spark/lib/dwh-commons.jar:/opt/spark/lib/guava.jar:/opt/spark/lib/hbase-client.jar:/opt/spark/lib/hbase-protocol.jar:/opt/spark/lib/htrace-core.jar:/opt/spark/conf/:/opt/spark/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark/lib/datanucleus-core-3.2.10.jar:/etc/hadoop/conf/
>> 16/01/28 10:09:18 INFO ZooKeeper: Client
>> environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
>> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:java.io.tmpdir=/tmp
>> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:java.compiler=<NA>
>> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:os.name=Linux
>> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:os.arch=amd64
>> 16/01/28 10:09:18 INFO ZooKeeper: Client
>> environment:os.version=3.13.0-24-generic
>> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:user.name=mbrynski
>> 16/01/28 10:09:18 INFO ZooKeeper: Client
>> environment:user.home=/home/mbrynski
>> 16/01/28 10:09:18 INFO ZooKeeper: Client
>> environment:user.dir=/home/mbrynski
>> 16/01/28 10:09:18 INFO ZooKeeper: Initiating client connection,
>> connectString=localhost:2181 sessionTimeout=90000
>> watcher=hconnection-0x36079f06, quorum=localhost:2181, baseZNode=/hbase
>> 16/01/28 10:09:18 INFO RecoverableZooKeeper: Process
>> identifier=hconnection-0x36079f06 connecting to ZooKeeper
>> ensemble=localhost:2181
>> 16/01/28 10:09:18 INFO ClientCnxn: Opening socket connection to server
>> localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL
>> (unknown error)
>> 16/01/28 10:09:18 INFO ClientCnxn: Socket connection established to
>> localhost/127.0.0.1:2181, initiating session
>> 16/01/28 10:09:18 INFO ClientCnxn: Session establishment complete on
>> server localhost/127.0.0.1:2181, sessionid = 0x15254709ed3c8e1,
>> negotiated timeout = 40000
>> 16/01/28 10:09:18 INFO ZooKeeperRegistry: ClusterId read in ZooKeeper is
>> null
>>
>>
>> 3) After making tunel I'm getting NPE.
>>
>> Caused by: java.lang.NullPointerException
>>         at
>> org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.getMetaReplicaNodes(ZooKeeperWatcher.java:269)
>>         at
>> org.apache.hadoop.hbase.zookeeper.MetaRegionTracker.blockUntilAvailable(MetaRegionTracker.java:241)
>>         at
>> org.apache.hadoop.hbase.client.ZooKeeperRegistry.getMetaRegionLocation(ZooKeeperRegistry.java:62)
>>         at
>> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateMeta(ConnectionManager.java:1203)
>>         at
>> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1164)
>>         at
>> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:294)
>>         at
>> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:130)
>>         at
>> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:55)
>>         at
>> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:201)
>>         ... 91 more
>>
>> Do you have any ideas how to resolve those problems ?
>>
>> Regards,
>> --
>> Maciek Bryński
>>
>>
>
>
> --
> Maciek Bryński
>
>

Mime
View raw message