incubator-blur-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dibyendu Bhattacharya <dibyendu.bhattach...@gmail.com>
Subject Re: new queue capability
Date Tue, 11 Mar 2014 08:08:15 GMT
Hi Aaron,

I am finally able to test the new queue capabilities . The results are
promising compared to traditional thrift client. Here is the details..

I have done the test on AWS EMR cluster, 3 node m1.large nodes. For all the
test, environment and memory set to default.

*Test 1* : Tested without the Queue feature ( doing client.mutate) for
10,000 records took almost 300 Seconds with a rate of *33 records/sec*.

*Test 2* : Tested with queue feature ( client.enqueueMutate ) for same
10,000 records in same cluster, and now it took just 23 Seconds ! with a
rate of
 *435 records/sec . This is jump of 13 times*

*Test 3*: Wanted to index around 90,000 documents in same cluster, but this
time using the client.enqueueMutate I got error in the log and all shards
server Hang . Below was the logs shows the error after that everything was
hang and shards become unresponsive.

org.apache.blur.thrift.BadConnectionException: Could not connect to
controller/shard server. All connections are bad.
        at
org.apache.blur.thrift.BlurClientManager.execute(BlurClientManager.java:235)
        at
org.apache.blur.thrift.BlurClient$BlurClientInvocationHandler.invoke(BlurClient.java:56)
        at com.sun.proxy.$Proxy0.enqueueMutate(Unknown Source)


In the log I found this code giving the error..


stackTraceStr:null, errorType:UNKNOWN)
        at
org.apache.blur.manager.writer.MutatableAction.merge(MutatableAction.java:460)
        at
org.apache.blur.manager.writer.MutatableAction.reduceMutates(MutatableAction.java:439)
        at
org.apache.blur.manager.writer.BaseQueueReader$1.run(BaseQueueReader.java:65)
        at java.lang.Thread.run(Thread.java:724)


Just to test if MutatableAction.reduceMutates is the culprit, I modified
the BaseQueueReader.java and only perform doMutate(mutations);

I commented out the  mutations = MutatableAction.reduceMutates(mutations);

With this changes, when I run the test again for 90,000 documents, this
time all documents got indexed properly, and it took around 157 seconds
with indexing rate of *575 records/seconds with 17 times jump. *

Just to give an idea of index size, 90K documents shows table size of 270MB
in Blur.

Regards,
Dibyendu








On Fri, Mar 7, 2014 at 9:51 PM, Dibyendu Bhattacharya <
dibyendu.bhattachary@gmail.com> wrote:

> Thanks Aaron for detail explanation. Just now browse through the changes.
> I think we do not need the TableQueueReader now. I will try out the thrift
> enqueMethod to see how it performs. We will be using Kafka client to
> populate the queue. Will let you know how that goes.
>
> Regards,
> Dibyendu
>
>
> On Fri, Mar 7, 2014 at 9:12 PM, Aaron McCurry <amccurry@gmail.com> wrote:
>
>> I have pushed most of the feature needed for the queue and here's a run
>> down on how it works.  I have left the original QueueReader in place at the
>> shards but renamed it to ShardQueueReader which requires the data to be
>> partitioned correctly.  It also makes use of an in-memory blocking queue
>> that I will be changing to an HDFS backed queue so the memory resources
>> won't be effected under heavy write load.  Then I have created another set
>> of thrift calls enqueueMutate, and enqueueMatchBatch that feeds the
>> internal queue.  Both of these methods are implemented on the controller
>> and the shard server.  There will be a TableQueueReader that can run inside
>> the controller to read from a queue and dealing with the partitioning
>> inside the controller.  The class is written and committed but the logic to
>> instantiate and run it has not been written.
>>
>> However using the controller api (standard Thrift Client) to write
>> RowMutations via the enqueueMethod from Storm could be an option right now
>> without needing to implement anything that runs inside of Blur.  The only
>> issue now is the blocking natural of the in-memory queue.  I will be
>> working to finish this feature before the release, but I believe that it is
>> mostly in a state to evaluate.  The only issue that I can see is that
>> writing data in via the enqueueMutate method could have some performance
>> slow downs once it hits the max queue length and once the HDFS back version
>> is in place that slow down will be less apparent.
>>
>> So here's a run down on where the feature lacks:
>>
>> 1.  The shard queues are an in-memory only data structure.  So data can
>> be lost at this point if a shard fails and because they have a finite
>> length they can block under heavy load.  This one is the I see as a must
>> before the release.
>> 2.  A way to run the table queue reader in the controllers, but with the
>> rework of the API I'm not sure you all would really need this anymore.
>>
>> Let me know if you all need any help getting started with this updated
>> code.
>>
>> Thanks!
>>
>> Aaron
>>
>>
>>
>>
>>
>> On Fri, Mar 7, 2014 at 10:00 AM, Dibyendu Bhattacharya <
>> dibyendu.bhattachary@gmail.com> wrote:
>>
>>> Hi Aaron,
>>>
>>> Do you still plan to have This Real Time Queue based indexing feature
>>> for Blur 0.2.2 ? I know you are very busy on 2.2. release, just wanted to
>>> know if this will be coming soon.
>>>
>>> Regards,
>>> Dibyendu
>>>
>>>
>>>
>>> On Tue, Mar 4, 2014 at 8:50 AM, Jonathan Hodges <hodgesz@gmail.com>wrote:
>>>
>>>> Nothing to add, I agree the Kafka partitions don't need to match the
>>>> Blur partitions.
>>>>
>>>>
>>>> On Mon, Mar 3, 2014 at 7:17 PM, Dibyendu Bhattacharya <
>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>
>>>>> Hi Aaron,
>>>>>
>>>>> No, I do not see we need to match Kafka Partitions with Blur
>>>>> Partitions. In fact, the number of partitions in Kafka and Number of
shards
>>>>> in Blur may not match also.
>>>>>
>>>>> Jonathan, do you have anything to add here.
>>>>>
>>>>> Regards,
>>>>> Dibyendu
>>>>>
>>>>>
>>>>> On Mon, Mar 3, 2014 at 11:35 PM, Aaron McCurry <amccurry@gmail.com>wrote:
>>>>>
>>>>>> Ok, I do have a question.  Do you see a use for the current use case
>>>>>> where Kafka partitions match Blur partitions and the clients pushing
>>>>>> messages into Kafka partition the data into the Kakfa partitions
to match
>>>>>> Blur partitions?  The reason I ask is i want to know if I should
keep the
>>>>>> current low level API pluggable or not.
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 3, 2014 at 10:29 AM, Dibyendu Bhattacharya <
>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Aaron, yes I figured it out how Blur uses this API, you
do
>>>>>>> not need to take a look at this.
>>>>>>>
>>>>>>> Once you are done with the new design of the queue feature, do
let
>>>>>>> me know, I will try to integrate Kafka into it and test it .
>>>>>>>
>>>>>>> Dibyendu
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 3, 2014 at 8:07 PM, Aaron McCurry <amccurry@gmail.com>wrote:
>>>>>>>
>>>>>>>> Based on your post on the mail list I assume that you got
what you
>>>>>>>> needed working, or at least figured out how Blur was using
the API.  Let me
>>>>>>>> know if you need me to take a look at this or not.  Also
I'm planning on
>>>>>>>> spending some time this afternoon working through making
this feature
>>>>>>>> easier to use.
>>>>>>>>
>>>>>>>> Aaron
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Feb 28, 2014 at 10:03 AM, Aaron McCurry <amccurry@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hey Dibyendu,
>>>>>>>>>
>>>>>>>>> It will take me a little while to digest this.  I will
try to get
>>>>>>>>> back to you later this afternoon.  Thanks!
>>>>>>>>>
>>>>>>>>> Aaron
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Feb 28, 2014 at 8:29 AM, Dibyendu Bhattacharya
<
>>>>>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Just forwarding you to guide me if the approach is
correct for
>>>>>>>>>> Kafka Consumer with multiple kafka partitions indexed
into multiple blur
>>>>>>>>>> table shards.
>>>>>>>>>>
>>>>>>>>>> Sorry that code is not that clean.
>>>>>>>>>>
>>>>>>>>>> Dibyendu
>>>>>>>>>>
>>>>>>>>>> ---------- Forwarded message ----------
>>>>>>>>>> From: Dibyendu Bhattacharya <dibyendu.bhattachary@gmail.com>
>>>>>>>>>> Date: Fri, Feb 28, 2014 at 5:32 PM
>>>>>>>>>> Subject: Re: new queue capability
>>>>>>>>>> To: blur-dev@incubator.apache.org, Jonathan Hodges
<
>>>>>>>>>> hodgesz@gmail.com>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I was just playing with the new QueueReader API,
and as Tim
>>>>>>>>>> pointed out , its very low level . I still tried
to implement a
>>>>>>>>>> KafkaConsumer .
>>>>>>>>>>
>>>>>>>>>> Here is my use case. Let me know if I have approached
correctly.
>>>>>>>>>>
>>>>>>>>>> I have a given topic in Kafka, which has 3 Partitions.
And in
>>>>>>>>>> Blur I have a table with 2 Shards . I need to index
all messages from Kafka
>>>>>>>>>> Topic to Blur Table.
>>>>>>>>>>
>>>>>>>>>>  I have used Kafka ConsumerGroupAPI to consume in
parallel in 2
>>>>>>>>>> streams ( from 3 partitions) for indexing into 2
Blur shards. As
>>>>>>>>>> ConsumerGroup API allow me to split any Kafka Topic
into N number of
>>>>>>>>>> streams, I can choose N for my target shard count,
here it is 2.
>>>>>>>>>>
>>>>>>>>>> For both shards I created two ShardContext and
>>>>>>>>>> two BlurIndexSimpleWriter. ( Is this okay ?)
>>>>>>>>>>
>>>>>>>>>> Now, I modified the BlurIndexSimpleWriter  to get
handle to
>>>>>>>>>> the _queueReader object.  I used this _queueReader
 to populate the
>>>>>>>>>> respective shards queue taking messages from KafkaStreams.
>>>>>>>>>>
>>>>>>>>>> Here is the TestCase (KafkaReaderTest) , KafkaStreamReader
(
>>>>>>>>>> which reads the Kafka Stream) , and the KafkaQueueReader
( The Q interface
>>>>>>>>>> for Blur)
>>>>>>>>>>
>>>>>>>>>> Also attached the modified BlurIndexSimpleWriter.
Just added
>>>>>>>>>>
>>>>>>>>>>   public QueueReader getQueueReader(){
>>>>>>>>>>
>>>>>>>>>>   return _queueReader;
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> With these changes, I am able to read Kafka messages
in parallel
>>>>>>>>>> streams and index them into 2 shards. All documents
from Kafka getting
>>>>>>>>>> indexed properly. But after TestCases run , I can
see two Index Directory
>>>>>>>>>> for two path I created.
>>>>>>>>>>
>>>>>>>>>> Let me know if this approach is correct ? In this
code, I have
>>>>>>>>>> not taken care of Shard Failure logic and as Tim
pointed out, if that can
>>>>>>>>>> be abstracted form client that will be great.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Dibyendu
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Feb 27, 2014 at 9:40 PM, Aaron McCurry <
>>>>>>>>>> amccurry@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> What if we provide an implementation of the QueueReader
concept
>>>>>>>>>>> that does
>>>>>>>>>>> what you are discussing.  That way in more extreme
cases when
>>>>>>>>>>> the user is
>>>>>>>>>>> forced into implementing the lower level api
(perhaps for
>>>>>>>>>>> performance) they
>>>>>>>>>>> can still do it, but for the normal case the
partitioning (and
>>>>>>>>>>> other
>>>>>>>>>>> difficult issues) are handled by the controllers.
>>>>>>>>>>>
>>>>>>>>>>> I could see adding an enqueueMutate call to the
controllers that
>>>>>>>>>>> pushes the
>>>>>>>>>>> mutates to the correct buckets for the user.
 At the same time
>>>>>>>>>>> we could
>>>>>>>>>>> allow each of the controllers to pull from an
external and push
>>>>>>>>>>> the mutates
>>>>>>>>>>> to the correct buckets for the shards.  I could
see a couple of
>>>>>>>>>>> different
>>>>>>>>>>> ways of handling this.
>>>>>>>>>>>
>>>>>>>>>>> However I do agree that right now there is too
much burden on
>>>>>>>>>>> the user for
>>>>>>>>>>> the 95% case.  We should make this simpler.
>>>>>>>>>>>
>>>>>>>>>>> Aaron
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Feb 27, 2014 at 10:07 AM, Tim Williams
<
>>>>>>>>>>> williamstw@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> > I've been playing around with the new QueueReader
stuff and I'm
>>>>>>>>>>> > starting to believe it's at the wrong level
of abstraction -
>>>>>>>>>>> in the
>>>>>>>>>>> > shard context - for a user.
>>>>>>>>>>> >
>>>>>>>>>>> > Between having to know about the BlurPartioner
and handling
>>>>>>>>>>> all the
>>>>>>>>>>> > failure nuances, I'm thinking a much friendlier
approach would
>>>>>>>>>>> be to
>>>>>>>>>>> > have the client implement a single message
pump that Blur
>>>>>>>>>>> take's from
>>>>>>>>>>> > and handles.
>>>>>>>>>>> >
>>>>>>>>>>> > Maybe on startup the Controllers compete
for the lead
>>>>>>>>>>> QueueReader
>>>>>>>>>>> > position, create it from the TableContext
and run with it?
>>>>>>>>>>>  The user
>>>>>>>>>>> > would still need to deal with  Controller
failures but that
>>>>>>>>>>> seems
>>>>>>>>>>> > easier to reason about then shard failures.
>>>>>>>>>>> >
>>>>>>>>>>> > The way it's crafted right now, the user
seems burdened with a
>>>>>>>>>>> lot of
>>>>>>>>>>> > the hard problems that Blur otherwise solves.
 Obviously, it
>>>>>>>>>>> trades
>>>>>>>>>>> > off a high burden for one of the controllers.
>>>>>>>>>>> >
>>>>>>>>>>> > Thoughts?
>>>>>>>>>>> > --tim
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message