phoenix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Samarth Jain (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (PHOENIX-2126) Improving performance of merge sort by multi-threaded and minheap implementation
Date Wed, 23 Sep 2015 05:26:04 GMT

    [ https://issues.apache.org/jira/browse/PHOENIX-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14903978#comment-14903978
] 

Samarth Jain commented on PHOENIX-2126:
---------------------------------------

[~ankit.singhal] - Have you experimented with using a single heap instead of creating buckets.size()
heaps? It would be good to understand what benefit you are getting out of using n threads
operating on n buckets vs a single thread picking up the next iterator using a min/max heap.
For K salt buckets, with each bucket returning N tuples, you would do O(NlogK) comparisons
to sort.

I think the real benefit you are deriving from doing things in multiple threads is because
you are pulling everything from HBase to client memory and sorting each bucket in parallel.
However, that won't scale when there are large number of rows returned which might cause client
to go OOM.

Also, it looks like your prepareIterators method is assigning every iterator to bucket 0 once
the index i >= nThreads. Why not just assign the iterator to the next bucket? Probably
better to do k = k % nThreads?
{code}
Map<Integer, List<PeekingResultIterator>> buckets=new HashMap<Integer, List<PeekingResultIterator>>();
+            int k=0;
+            for(int i=0;i<iterators.size();i++){
+                if(i>=NTHREDS){
+                    k=0;
+                }
+                List<PeekingResultIterator> list = buckets.get(k);
+                if(list==null){
+                    list=new ArrayList<PeekingResultIterator>();
+                    buckets.put(k, list);
+                }
+                list.add(iterators.get(i));
+                k++;
+            }
{code}

Minor nit: Rename NTHREDS to nThreads.

> Improving performance of merge sort by multi-threaded and minheap implementation
> --------------------------------------------------------------------------------
>
>                 Key: PHOENIX-2126
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-2126
>             Project: Phoenix
>          Issue Type: Improvement
>    Affects Versions: 4.1.0, 4.2.0
>            Reporter: Ankit Singhal
>         Attachments: PHOENIX-2126_v1.0.patch, PHOENIX-2126_v2.0.patch
>
>
> {code}
> CREATE TABLE IF NOT EXISTS test (
> dim1 INTEGER NOT NULL,
> A.B INTEGER,
> A.M DECIMAL,
> CONSTRAINT PK PRIMARY KEY
> (dim1))
> SALT_BUCKETS =256,DEFAULT_COLUMN_FAMILY='A';
> {code}
> *Query to benchmark:-*
> {code}
> select dim1,sum(b),sum(m) from test where Datemth>=201505 and Datemth<=201505 and
dim1 IS NOT NULL  group by dim1 order by sum(m) desc nulls last limit 10;
> {code}
> *current scenario:-*
> *CASE 1: * consider the case when dim1 is high cardinality attribute (10K+) and table
have salt bucket set to 256, we will get 256 iterators from above query at the client and
MergeSortRowKeyResultIterator has to merge these 256 iterators with single thread. So let's
say each iterator has 10k tuples returned, then merge sort needs to merge 2.5M tuples which
will be costly if it is done with single thread and the query spend most of its time on client
> *CASE 2: * consider the case when dim1 is high cardinality attribute (10K+) and table
have salt bucket set to 1, we will get 1 iterator from  above query at the client and MergeSortRowKeyResultIterator
doesn't need to merge anything. Here, it is fine with single thread.
> *CASE 3: * consider the case when dim1 is low cardinality attribute (10-100) and table
have salt bucket set to 256, we will get 256 iterator from  above query at the client and
MergeSortRowKeyResultIterator has to merge these 256 iterators with single thread. here the
single thread is also fine as he has to merge only 2560 tuples.
> *Solution for case1 problem is:-*
> Optimized the implementation of merging 'n'-sorted iterators(having 'm' tuples)  by using
"min heap" which optimizes the time complexity from 
> O(n2m) to O(nmLogn) (as heapify takes (Logn) time).
> And, By using multiple-threads('t') to process group of iterators which further optimized
the complexity to 
> T(nm)=T(nm)/t+T(t)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message