spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "沈阳延云云计算" <1820150...@qq.com>
Subject 回复:a new FileFormat 5x~100x faster than parquet
Date Mon, 22 Feb 2016 11:47:11 GMT
mark




Ya100 is a FileFormat 5x~100x  faster than parquet。
we can get ya100 from this link https://github.com/ycloudnet/ya100/tree/master/v1.0.8




 
1.we used the inverted index,so we skip the rows that we does need.

  for example  the trade log search SQL

     

        select 
 
(1)phonenum,usernick,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_day,amtlong  
 
        from spark_txt where   
 
(2)tradeid=' 2014012213870282671'
 
        limit 10;  
 





     this sql is compose by two part

     (1)the part 1 is return the result which has 9 columns

     (2) the part 2 is the filter condition ,filter by tradeid

      


      let guess which plan is faster

     plan A :first read all the 9 columns result then filter by tradeid

     plan B: first filter by tradeid ,then we read the match 9 columns result.




    Ya100 choose plan B




     contrast  performance Ya100`index with parquet








 2.TOP N sort ,the non sort column we doesn`t read it until the last 


  for example  we sort by the logtime

 

    select 
 
(1)phonenum,usernick,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_day,amtlong  
 
    from spark_txt 
 
(2)order by logtime desc 
 
    limit 10;  
 


  this sql is compose by two part
     (1)the part 1 is return the result which has 9 columns

     (2) the part 2 is the column need to sort

      


      let guess which plan is faster
     plan A :first read all the 9 columns result then sort by logtime
     plan B: first sort by logtime  ,then we read the match 9 columns result.




       Ya100 choose plan B




     contrast  performance Ya100`lazy read with parquet


3.we used label instead of the original value for grouping and sorting



1).General situation,the data has a lot of repeat value,for exampe the sex file ,the age field
.
2).if we store the original value ,that will weast a lot of storage.
so we make a small modify at original  value, Additional add a new filed called label.
make a unique value sort by fields, and then gave each term a unique  Number from begin to
end  .
3).we use number value(we called label) instead of original  value.lable is store by fixed
length. the file could be read by random read.
4).the label`s order is the same with dictionary  order .so if we do some calculation like
order by or group by only need to  read the label. we don`t need to read the original value.
5).some field like sex field ,only have 2 different values.so we only use 2 bits(not 2 bytes)
to store the label, it will save a lot of Disk io.
 when we finish all of the calculation, we translate label to original  value by a dictionary.
6)if a lots of rows have the same original value ,the original value we only store once,only
read once.
Solve the problem:
1)ya100`s data is quite big we don`t have enough memory to load all Values to memory.
2)on realtime mode ,data is change Frequent , The cache is invalidated Frequent by append
or update. build Cache will take a lot of times and io;
3)the Original value  is a string type.  whene sorting or grouping ,thed string value need
a lot of memory and need lot of cpu time to calculate hashcode \compare \equals ,But label
is number  is fast.
4)the label is number ,it`s type maybe short ,or maybe a byte ,or may be integer whitch depending
on the max number of the label.

two-phase search
Original:
1)group by order by use original value,the real value may be is a string type,may be more
larger ,the real value maybe  need a lot of io 
2)compare by string is slowly then compare by integer
Our improve:
1)we split one search into multy-phase search
2)the first search we only search the field that use for order by ,group by
3)the first search we doesn`t need to read the original value(the real value),we only need
to read the docid and label for order by group by.
4)when we finish all the order by and group by ,may be we only need to return Top n records
.so we start next to search to get the Top n records original value.
Solve the problem:
5)reduce io ,read original take a lot of disk io
6)reduce network io (for merger)
7)most of the field has repeated value, the repeated only need to read once
the group by filed only need to read the origin once by label where display to user.
8)most of the search only need to display on Top n (n<=100) results, so most of the original
value could be skip.











 
How to install ya100 
 
 
1)Add the depend jar 
 
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/data/ycloud/spark_ydb/ya100.jar
 
You can get the ya100 jar from this link



 
https://github.com/ycloudnet/ya100/blob/master/v1.0.8/ya100-1.0.8.jar



 
 
 
2)Start thriftserver 
 
./start-thriftserver.sh     --master yarn-client  --executor-memory 990m --executor-cores
2 --num-executors 16
 
 
 
Our suggest spark version is 6 
 
3)Config the ya100 function 
 
create  function Yfilter as 'cn.net.ycloud.ydb.handle.fun.Yfilter';   
 
create  function Ytop10000 as 'cn.net.ycloud.ydb.handle.fun.Ytop10000';  
 
create  function Ycombine as 'cn.net.ycloud.ydb.handle.fun.Ycombine'; 
 
create  function Ycount as 'cn.net.ycloud.ydb.handle.fun.Ycount';  
 
create  function Ymax as 'cn.net.ycloud.ydb.handle.fun.Ymax'; 
 
create  function Ymin as 'cn.net.ycloud.ydb.handle.fun.Ymin';   
 
create  function Yavg as 'cn.net.ycloud.ydb.handle.fun.Yavg';    
 
create  function Ysum as 'cn.net.ycloud.ydb.handle.fun.Ysum'; 
 
create  function Ymaxstring as 'cn.net.ycloud.ydb.handle.fun.Ymaxstring'; 
 
create  function Yminstring as 'cn.net.ycloud.ydb.handle.fun.Yminstring‘; 
 
 
YA100 support data type 
 

 
How to create ya100 table on spark Sql 
 
CREATE external  table spark_ya100(  
 
 phonenum bigint, usernick string, ydb_sex string,  ydb_province string, ydb_grade string,
ydb_age string, ydb_blood string, ydb_zhiye string, ydb_earn string, ydb_prefer string, ydb_consume
string, ydb_day string, amtdouble double,amtlong int, content string, ydbpartion string, ya100_pipe
string 
 
) partitioned by (dt string) 
 
STORED BY 'cn.net.ycloud.ydb.handle.Ya100StorageHandler' 
 
LOCATION  '/data/ydb/shu_ya100' 
 
TBLPROPERTIES( 
 
"ya100.handler.table.name"="ydb_example_shu", 
 
"ya100.handler.schema"="phonenum long,usernick string,ydb_sex string,ydb_province string,ydb_grade
string,ydb_age string,ydb_blood string,ydb_zhiye string,ydb_earn string,ydb_prefer string,ydb_consume
string,ydb_day string, amtdouble double,amtlong int,content textcjk,ydbpartion string,ya100_pipe
string" 
 
)  
 
//注,ydbpartion为ydb的分区字段与ya100_pipe 为ya100的管道字段,必须要都要创建,否则有些功能会被限制使用。
 
ya100.handler.table.name为预留的key,不同的表之间要区分开来
 
ya100.handler.schema 为内置的索引的数据类型,名字要与spark的字段名称一致。

 

 
How to Import data into ya100 table


 

1)Set up import params 
 
set hive.mapred.supports.subdirectories=true; 
 
set mapred.min.split.size=2147483648;
 
注: 
 
ya100因为生成的最终不是一个文件,而是一个目录,这个目录下会有索引,所以要通过subdirectories参数设置spark支持子目录,否则向表里导入数据会报错失败。

 
set mapred.min.split.size=2147483648 是为了控制map的个数,防止生成的索引文件数量太多。

 
 
 
2)Begin import  
 
insert into table spark_ya100 partition (dt='1200million')
 
 select phonenum,usernick,ydb_sex,ydb_province,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_prefer,ydb_consume,ydb_day,amtdouble,amtlong,content,2,1

 
from spark_txt where dt='1200million'; 
 
 
How to use ya100 to filter data 
 
 
1),basic example 1 
 
set ya100.spark.filter.ydb_example_shu=ydb_sex='女' or ydb_province='辽宁' or ydb_day>='20151217';

 
有些时候,会遇到转义的问题,这里的value值,可以进行urlencode处理。
 
如:
 
set ya100.spark.filter.ydb_example_shu=ydbpartion%3D%2720151110%27+and+%28ydb_sex%3D%27%E5%A5%B3%27+or+ydb_province%3D%27%E8%BE%BD%E5%AE%81%27+or+ydb_day%3E%3D%2720151217%27%29;
 
 
 
2)Filter example 2 
 
set ya100.spark.filter.ydb_example_shu=ydb_sex='女' or ydb_province='辽宁' or ydb_day>='20151217';

 
select * from spark_ya100 where dt=‘1200million’ limit 10; 
 
 
 
 
 
 
Other filter example use case  
 
1)equal 
 
qq=‘165162897’
 
2)Support in 
 
如:indexnum in (1,2,3) 
 
3)>,<,>=,<=, 
 
clickcount >=10 and clickcount <=11 
 
4) range 
 
indexnum like '({0 TO 11}) '      不包含边界值
 
indexnum like '([10 TO 11] ) '    包含边界值
 
5) Unequal 
 
label<>'l_14' and label<>'l_15'  
 
6) 
 
indexnum='1' or indexnum='2' or (clickcount >=5 and clickcount <=11) 
 
 
 



 
How to use ya100 to make a top N sort 
 
set ya100.spark.top10000.ydb_example_shu=ydb_age desc limit 10; 
 
set ya100.spark.top10000.ydb_example_shu=ydb_sex desc,ydb_province limit 100; 
 
set ya100.spark.top10000.ydb_example_shu=* limit 10; 
 
 
 
注意,当前版本limit的最大值为10000 (每个索引最大返回1000) 
 
列的名字 写*表示 不需要排序,但只返回前N条就可以了 
 
 
 
SQL example  
 
 
 
set ya100.spark.top10000.ydb_example_shu=phonenum desc limit 10; 
 
select * from spark_ya100 where dt='1200million' order by phonenum desc limit 10; 
 
 
 

 
How to make group by or stat by ya100 
 
Example 1 
 
set ya100.spark.combine.ydb_example_shu=*,ydb_age; 
 
select ydb_province,Ycount('*',ya100_pipe), Ycount('ydb_age',ya100_pipe), Ymaxstring('ydb_age',ya100_pipe),
Yminstring('ydb_age',ya100_pipe) 
 
 from spark_ya100  group by ydb_province limit 10 
 
  
 
Example 2 
 
set ya100.spark.combine.ydb_example_shu=amtlong; 
 
select ydb_sex, ydb_province,Ysum('amtlong',ya100_pipe) as cnt from spark_ya100  group by
ydb_sex, ydb_province  limit 10 
 
 
 

 
Other SQL EXAMPLE


 
•EXAMPLE COUNT(*) 
 
set ya100.spark.filter.ydb_example_shu=phonenum='13870282671'  and usernick='江峻熙'; 
 
set ya100.spark.combine.ydb_example_shu=*; 
 
set ya100.spark.top10000.ydb_example_shu=; 
 
select Ycount('*',ya100_pipe) from spark_ya100 where dt='100million' limit 10; 
 •TOP N SORT EXAMPLE
 •set ya100.spark.filter.ydb_example_shu= amtlong like '([1090 TO 1100] )' and amtdouble
like '([1090 TO 1100] )'; 
 
set ya100.spark.combine.ydb_example_shu=; 
 
set ya100.spark.top10000.ydb_example_shu=phonenum desc limit 10; 
 
select * from spark_ya100 where dt='100million' order by phonenum desc limit 10;
 
SUM(amtlong)
 
set ya100.spark.filter.ydb_example_shu= amtlong like '([1090 TO 1100] )' and amtdouble like
'([1090 TO 1100] )'; 
 
set ya100.spark.combine.ydb_example_shu=amtlong; 
 
set ya100.spark.top10000.ydb_example_shu=; 
 
select Ycount(‘amtlong’,ya100_pipe) ,Ysum(‘amtlong’,ya100_pipe) ,Yavg(‘amtlong’,ya100_pipe),
Ymax(‘amtlong’,ya100_pipe), Ymin(‘amtlong’,ya100_pipe)  from spark_ya100 where dt=‘100million’
 limit 10; 
 
 •Top n sort 
 
set ya100.spark.filter.ydb_example_shu= amtlong like '([1090 TO 1100] )' and amtdouble like
'([1090 TO 1100] )'; 
 
set ya100.spark.combine.ydb_example_shu=; 
 
set ya100.spark.top10000.ydb_example_shu= ydb_age desc, ydb_sex; 
 
select ydb_sex, ydb_age from spark_ya100 where dt='100million'  order  by  ydb_age desc, ydb_sex
 limit 10;  
 •Group by 
 
set ya100.spark.filter.ydb_example_shu=; 
 
set ya100.spark.combine.ydb_example_shu=ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_prefer,ydb_consume,ydb_day,amtlong;

 
set ya100.spark.top10000.ydb_example_shu=; 
 
select ydb_sex,ydb_province,ydb_grade, Ymaxstring(‘ydb_age’,ya100_pipe),Ymaxstring(‘ydb_blood’,ya100_pipe),Ymaxstring(‘ydb_zhiye’,ya100_pipe),Ymaxstring(‘ydb_earn’,ya100_pipe),Ymaxstring(‘ydb_prefer’,ya100_pipe),Ymaxstring(‘ydb_consume’,ya100_pipe),Ymaxstring(‘ydb_day’,ya100_pipe),Ysum(‘amtlong’,ya100_pipe)
as cnt from spark_ya100 where dt=‘100million’  group by ydb_sex,ydb_province,ydb_grade
order by cnt desc limit 10 
 

 
Connect Ya100 to ydb  make real time data 
CREATE external  table spark_ydb(
 phonenum bigint, usernick string, ydb_sex string,  ydb_province string, ydb_grade string,
ydb_age string, ydb_blood string, ydb_zhiye string, ydb_earn string, ydb_prefer string, ydb_consume
string, ydb_day string, amtdouble double,amtlong int, content string, ydbpartion string, ya100_pipe
string
)
STORED BY 'cn.net.ycloud.ydb.handle.Ya100StorageHandler'
LOCATION  '/data/ydb/shu_ydb'
TBLPROPERTIES(
"ya100.handler.table.name"="ydb_example_shu",
"ya100.handler.master"="101.200.130.48:8080",
"ya100.handler.columns.mapping"="phonenum,usernick,ydb_sex,ydb_province,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_prefer,ydb_consume,ydb_day,amtdouble,amtlong,content,ydbpartion,ya100_pipe")

注

ydbpartion为ydb的分区字段,查询的时候必须执行ydbpartion,而spark表则没有分区字段

 
ya100_pipe 为ya100的管道字段,必须要都要创建,否则有些功能会被限制使用。
 
ya100.handler.table.name为ydb系统里的表名字
 
ya100.handler.columns.mapping 为ydb系统内的表与spark的表之间的映射。
Mime
View raw message