在 2015-01-29 19:59:09，"yannianmu(母延年)" <firstname.lastname@example.org> 写道：
Dear Lucene dev
We are from the the Hermes team. Hermes is a project base on lucene 3.5 and solr 3.5.
Hermes process 100 billions documents per day,2000 billions document for total days (two month). Nowadays our single cluster index size is over then 200Tb,total size is 600T. We use lucene for the big data warehouse speed up .reduce the analysis response time, for example filter like this age=32 and keywords like 'lucene' or do some thing like count ,sum,order by group by and so on.
Hermes could filter a data form 1000billions in 1 secondes.10billions data`s order by taken 10s,10billions data`s group by thaken 15 s,10 billions days`s sum,avg,max,min stat taken 30 s
For those purpose,We made lots of improve base on lucene and solr , nowadays lucene has change so much since version 4.10, the coding has change so much.so we don`t want to commit our code to lucene .only to introduce our imporve base on luene 3.5,and introduce how hermes can process 100billions documents per day on 32 Physical Machines.we think it may be helpfull for some people who have the similary sense .
First level index(tii)，Loading by Demand
1. .tii file is load to ram by TermInfosReaderIndex
2. that may quite slowly by first open Index
3. the index need open by Persistence,once open it ,nevel close it.
4. this cause will limit the number of the index.when we have thouthand of index,that will Impossible.
1. Loading by Demand,not all fields need to load into memory
2. we modify the method getIndexOffset(dichotomy) on disk, not on memory,but we use lru cache to speed up it.
3. getIndexOffset on disk can save lots of memory,and can reduce times when open a index
4. hermes often open different index for dirrerent Business; when the index is not often to used ,we will to close it.(manage by lru)
5. such this my 1 Physical Machine can store over then 100000 number of index.
Solve the problem:
1. hermes need to store over then 1000billons documents,we have not enough memory to store the tii file
2. we have over then 100000 number of index,if all is opend ,that will weast lots of file descriptor,the file system will not allow.
Build index on Hdfs
1. We modifyed lucene 3.5 code at 2013.so that we can build index direct on hdfs.(lucene has support hdfs since 4.0)
2. All the offline data is build by mapreduce on hdfs.
3. we move all the realtime index from local disk to hdfs
4. we can ignore disk failure because of index on hdfs
5. we can move process from on machine to another machine on hdfs
6. we can quick recover index when a disk failure happend .
7. we does need recover data when a machine is broker(the Index is so big move need lots of hours),the process can quick move to other machine by zookeeper heartbeat.
8. all we know index on hdfs is slower then local file system,but why ? local file system the OS make so many optimization, use lots cache to speed up random access. so we also need a optimization on hdfs.that is why some body often said that hdfs index is so slow the reason is that you didn`t optimize it .
9. we split the hdfs file into fix length block,1kb per block.and then use a lru cache to cache it ,the tii file and some frequent terms will speed up.
10. some hdfs file does`t need to close Immediately we make a lru cache to cache it ,to reduce the frequent of open file.
Improve solr, so that one core can dynamic process multy index.
1. a solr core(one process) only process 1~N index by solr config
2. use a partion like oracle or hadoop hive.not build only one big index,instand build lots of index by day(month,year,or other partion)
3. dynamic create table for dynamic businiss
Solve the problem:
1. to solve the index is to big over then Interger.maxvalue, docid overflow
2. some times the searcher not need to search all of the data ,may be only need recent 3 days.
Label mark technology for doc values
1. group by,sort,sum,max,min ,avg those stats method need to read Original from tis file
2. FieldCacheImpl load all the term values into memory for solr fieldValueCache,Even if i only stat one record .
3. first time search is quite slowly because of to build the fieldValueCache and load all the term values into memory
1. General situation,the data has a lot of repeat value,for exampe the sex file ,the age field .
2. if we store the original value ,that will weast a lot of storage.
so we make a small modify at TermInfosWriter, Additional add a new filed called termNumber.
make a unique term sort by term through TermInfosWriter, and then gave each term a unique Number from begin to end (mutch like solr UnInvertedField).
3. we use termNum(we called label) instead of Term.we store termNum(label) into a file called doctotm. the doctotm file is order by docid,lable is store by fixed length. the file could be read by random read(like fdx it store by fixed length),the file doesn`t need load all into memory.
4. the label`s order is the same with terms order .so if we do some calculation like order by or group by only read the label. we don`t need to read the original value.
5. some field like sex field ,only have 2 different values.so we only use 2 bits(not 2 bytes) to store the label, it will save a lot of Disk io.
6. when we finish all of the calculation, we translate label to Term by a dictionary.
7. if a lots of rows have the same original value ,the original value we only store once,onley read once.
Solve the problem:
1. Hermes`s data is quite big we don`t have enough memory to load all Values to memory like lucene FieldCacheImpl or solr UnInvertedField.
2. on realtime mode ,data is change Frequent , The cache is invalidated Frequent by append or update. build FieldCacheImpl will take a lot of times and io;
3. the Original value is lucene Term. it is a string type. whene sortring or grouping ,thed string value need a lot of memory and need lot of cpu time to calculate hashcode \compare \equals ,But label is number is fast.
4. the label is number ,it`s type mabbe short ,or maybe byte ,or may be integer whitch depending on the max number of the label.
5. read the original value will need lot of io, need iterate tis file.even though we just need to read only docunent.
6. Solve take a lot of time when first build FieldCacheImpl.
1. group by order by use original value,the real value may be is a string type,may be more larger ,the real value maybe need a lot of io because of to read tis,frq file
2. compare by string is slowly then compare by integer
1. we split one search into multy-phase search
2. the first search we only search the field that use for order by ,group by
3. the first search we doesn`t need to read the original value(the real value),we only need to read the docid and label(see < Label mark technology for doc values>) for order by group by.
4. when we finish all the order by and group by ,may be we only need to return Top n records .so we start next to search to get the Top n records original value.
Solve the problem:
1. reduce io ,read original take a lot of disk io
2. reduce network io (for merger)
3. most of the field has repeated value, the repeated only need to read once
the group by filed only need to read the origina once by label whene display to user.
4. most of the search only need to display on Top n (n<=100) results, so use to phrase search some original value could be skip.
1. hermes doesn`t update index one by one,it use batch index
2. the index area is split into four area ,they are called doclist=>buffer index=>ram index=>diskIndex/hdfsIndex
3. doclist only store the solrinputdocument for the batch update or append
4. buffer index is a ramdirectory ,use for merge doclist to index.
5. ram index is also a ramdirector ,but it is biger then buffer index, it can be search by the user.
6. disk/hdfs index is Persistence store use for big index
7. we also use wal called binlog(like mysql binlog) for recover
two-phase commit for update
1. we doesn`t update record once by once like solr(solr is search by term,found the document,delete it,and then append a new one),one by one is slowly.
2. we need Atomic inc field ,solr that can`t support ,solr only support replace field value.
Atomic inc field need to read the last value first ,and then increace it`s value.
3. hermes use pre mark delete,batch commit to update a document.
4. if a document is state is premark ,it also could be search by the user,unil we commit it.
we modify SegmentReader ,split deletedDocs into to 3 part. one part is called deletedDocstmp whitch is for pre mark (pending delete),another one is called deletedDocs_forsearch which is for index search, another is also call deletedDocs
5. once we want to pending delete a document,we operate deletedDocstmp (a openbitset)to mark one document is pending delete.
and then we append our new value to doclist area(buffer area)
the pending delete means user also could search the old value.
the buffer area means user couldn`t search the new value.
but when we commit it(batch)
the old value is realy droped,and flush all the buffer area to Ram area(ram area can be search)
6. the pending delete we called visual delete,after commit it we called physics delete
7. hermes ofthen visula delete a lots of document ,and then commit once ,to improve up the Performance one by one
8. also we use a lot of cache to speed up the atomic inc field.
Term data skew
1. lucene use inverted index to store term and doclist.
2. some filed like sex has only to value male or female, so male while have 50% of doclist.
3. solr use filter cache to cache the FQ,FQ is a openbitset which store the doclist.
4. when the firest time to use FQ(not cached),it will read a lot of doclist to build openbitset ,take a lot of disk io.
5. most of the time we only need the TOP n doclist,we dosn`t care about the score sort.
1. we often combination other fq,to use the skip doclist to skip the docid that not used( we may to seed the query methord called advance)
2. we does`n cache the openbitset by FQ ,we cache the frq files block into memeory, to speed up the place often read.
3. our index is quite big ,if we cache the FQ(openbitset),that will take a lots of memory
4. we modify the indexSearch to support real Top N search and ignore the doc score sort
Solve the problem:
1. data skew take a lot of disk io to read not necessary doclist.
2. 2000billions index is to big,the FQ cache (filter cache) user openbitset take a lot of memor
3. most of the search ,only need the top N result ,doesn`t need score sort,we need to speed up the search time
Openbitset,fieldvalueCache need to malloc a big long or int array. it is ofen seen by lots of cache ,such as UnInvertedField,fieldCacheImpl,filterQueryCache and so on. most of time much of the elements is zero(empty),
1. we create the big array directly,when we doesn`t neet we drop it to JVM GC
1. we split the big arry into fix length block,witch block is a small array,but fix 1024 length .
2. if a block `s element is almost empty(element is zero),we use hashmap to instead of array
3. if a block `s non zero value is empty(length=0),we couldn`t create this block arrry only use a null to instead of array
4. when the block is not to use ,we collectoion the array to buffer ,next time we reuse it
Solve the problem:
1. save memory
2. reduce the jvm Garbage collection take a lot of cpu resource.
weakhashmap,hashmap , synchronized problem
1. FieldCacheImpl use weakhashmap to manage field value cache,it has memory leak BUG.
2. sorlInputDocunent use a lot of hashmap,linkhashmap for field,that weast a lot of memory
3. AttributeSource use weakhashmap to cache class impl,and use a global synchronized reduce performance
4. AttributeSource is a base class , NumbericField extends AttributeSource,but they create a lot of hashmap,but NumbericField never use it .
5. all of this ,JVM GC take a lot of burder for the never used hashmap.
1. weakhashmap is not high performance ,we use softReferance instead of it
2. reuse NumbericField avoid create AttributeSource frequent
3. not use global synchronized
when we finish this optimization our process,speed up from 20000/s to 60000/s (1k per document).
Other GC optimization
1. reuse byte arry in the inputbuffer ,outpuer buffer .
2. reuse byte arry in the RAMfile
3. remove some finallze method, the not necessary.
4. use StringHelper.intern to reuse the field name in solrinputdocument
1. index commit doesn`t neet sync all the field
2. we use a block cache on top of FsDriectory and hdfsDirectory to speed up read sppedn
3. we close index or index file that not often to used.also we limit the index that allow max open;block cache is manager by LRU
1. optimization ThreadPool in searchHandle class ,some times does`t need keep alive connection,and increate the timeout time for large Index.
2. remove jetty ,we write socket by myself ,jetty import data is not high performance
3. we change the data import form push mode to pull mode with like apache storm.
1. append mode we doesn`t store the field value to fdt file.that will take a lot of io on index merger, but it is doesn`t need.
2. we store the field data to a single file ,the files format is hadoop sequence file ,we use LZO compress to save io
3. we make a pointer to point docid to sequencefile
non tokenizer field optimization
1. non tokenizer field we doesn`t store the field value to fdt field.
2. we read the field value from label (see <<Label mark technology for doc values>>)
3. most of the field has duplicate value，this can reduce the index file size
multi level of merger server
1. solr can only use on shard to act as a merger server .
2. we use multi level of merger server to merge all shards result
3. shard on the same mathine have the high priority to merger by the same mathine merger server.
solr`s merger is like this
hermes`s merger is like this
1. hermes support Sql .
2. support union Sql from different tables;
3. support view table
Hermes`sql may be like this
l select higo_uuid,thedate,ddwuid,dwinserttime,ddwlocaltime,dwappid,dwinituserdef1,dwclientip,sclientipv6,dwserviceip,dwlocaiip,dwclientversion,dwcmd,dwsubcmd,dwerrid,dwuserdef1,dwuserdef2,dwuserdef3,dwuserdef4,cloglevel,szlogstr from sngsearch06,sngsearch09,sngsearch12 where thedate in ('20140917') and ddwuin=5713 limit 0,20
l select thedate,ddwuin,dwinserttime,ddwlocaltime from sngsearch12 where thedate in ('20140921') and ddwuin=5713 order by ddwlocaltime desc limit 0,10
l select count(*),count(ddwuid) from sngsearch03 where thedate=20140921 limit 0,100
l select sum(acnt),average(acnt),max(acnt),min(acnt) from sngsearch03 where thedate=20140921 limit 0,100
l select thedate,ddwuid,sum(acnt),count(*) from sngsearch18 where thedate in (20140908) and ddwuid=7823 group by thedate,ddwuid limit 0,100;
l select count(*) from guangdiantong where thedate ='20141010' limit 0,100
l select freqtype,fspenttime,fmodname,yyyymmddhhmmss,hermestime,freqid from guangdiantong where thedate ='20141010' limit 0,100
l select freqtype,fspenttime,fmodname,yyyymmddhhmmss,hermestime,freqid from guangdiantong where thedate ='20141010' order by yyyymmddhhmmss desc limit 0,10
l select miniute1,count(*) from guangdiantong where thedate ='20141010' group by miniute1 limit 0,100
l select miniute5,count(*) from guangdiantong where thedate ='20141010' group by miniute5 limit 0,100
l select hour,miniute15,count(*) from guangdiantong where thedate ='20141010' group by hour,miniute15 order by miniute15 desc limit 0,100
l select hour,count(*),sum(fspenttime),average(fspenttime),average(ferrorcode) from guangdiantong where thedate ='20141010' and freqtype=1 group by hour limit 0,100
l select freqtype,count(*),sum(fspenttime),average(fspenttime) from guangdiantong where thedate ='20141010' and (freqtype>=10000 and freqtype<=10100) group by freqtype limit 0,100
l select freqtype,count(*),sum(fspenttime),average(fspenttime) from guangdiantong where thedate ='20141010' and (freqtype>=10000 and freqtype<=10100) group by freqtype order by average(fspenttime) desc limit 0,100
l select hour,miniute15,count(*),sum(fspenttime),average(fspenttime) from guangdiantong where thedate ='20141010' group by hour,miniute15 order by miniute15 desc limit 0,100
l select thedate,yyyymmddhhmmss,miniute1,miniute5,miniute15,hour,hermestime,freqtype,freqname,freqid,fuid,fappid,fmodname,factionname,ferrorcode,ferrormsg,foperateret,ferrortype,fcreatetime,fspenttime,fserverip,fversion from guangdiantong where thedate ='20141010' order by yyyymmddhhmmss desc limit 0,100