helix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kishore g <g.kish...@gmail.com>
Subject Re: Add performant IPC (Helix actors)
Date Sat, 12 Jul 2014 18:27:38 GMT
Hi Vlad,

Yes the idea is to have a pluggable architecture where Helix provides the
messaging channel without serializing/deserializing the data. In the end, I
envision having most of the building blocks needed to build a distributed
system. One should be able to build a datastore/search/olap/pu sub/stream
processing systems easily on top of Helix.

I am curious to see what kind of tuning you guys did to handle 1.5 QPS. Our
goal is to have 1M QPS between the two servers. In order to achieve this we
might need pipelining/batching along with ordering guarantee's. This is non
trivial and error prone, I see a huge value in providing this feature. Most
systems have built some form replication schemes on their own. There are
two parts in the replication, one is the high through put message exchange
and the other is the scheme that is synchronous replication/async
replication/chained replication/consensus etc. The first part is the common
part that is needed across all the replication scheme.

Of course getting this right is not easy and design is tricky but I feel
its worth a try.

This is the high level flow I had discussed with Greg.

Each server can host P partitions and is listening at one port. Based on
how many execution threads are needed, we can create C channels. Every
partition maps to a specific channel. On each channel we can guarantee
message ordering. We simply provide a callback to handle each message and
pass the received bytes without deserializing. Its up to the callback to
either process it synchronously or asynchronously.

There are more details that its difficult to go over in the email. We
should probably write up the design and then deep dive on the details.

thanks,
Kishore G
















On Sat, Jul 12, 2014 at 8:08 AM, vlad.gm@gmail.com <vlad.gm@gmail.com>
wrote:

> I agree that having a messaging scheme that is not ZooKeeper-reliant would
> help in building some synchronization protocols between clients and
> replicas. I am thinking of cases such as writing inside Kafka to replicated
> partitions our our own application,a replicated Key/Value store.
>
> For me, service discovery bridges the gap between an application wanting
> to address a service (in Helix's case more particularly, a service/resource
> name/partition tuple) and finding out a list of hostnames that can receive
> that request. After that, whether the application wants to use Helix itself
> or another messaging stack is an open problem, perhaps a clear separation
> of the discovery/messaging layers would allow for quickly swapping
> different pieces here.
>
> The interesting part (in my view) in providing fast messaging as part of
> Helix is the ability to use the service topology information that Helix
> hosts (number of replicas and their placement) together with the messaging
> layer as sufficient components for realizing a consensus algorithm between
> replicas, that is providing, on top of the regular service discovery
> function of writing to one instance of a service, the function of writing
> with consensus verification to all replicas of a service.
>
> In terms of capabilities, looking at Kafka and KV/Store as use cases, this
> would mean running a whole protocol between the replicas of the service to
> achieve a certain synchronization outcome (be it Paxos, 2 Phase Commit or
> Chain Replication - Paxos would probably not be needed since Helix already
> provides a resilient master for the partition, so there is a clear
> hierarchy). Kafka suggests even more cases if we take the interaction
> between client and service into account, namely specifying the call to
> return after the master gets the data or after consensus between master and
> replicas have been achieved, or sending data asynchronously (the
> combination of the producer.type and request.required.acks parameters of
> the producer).
>
> The only reason for which I would like to see a pluggable architecture is
> that in high volume applications the messaging stack requires a lot of
> tuning. As I said at the meet-up, in our company we deal with about 1.5M
> QPS, so having serialization/deserialization overheads per message, or
> event separate listening/executor threads can be resource intensive.
> Whenever we can, we are tempted to use a stage-event drive architecture,
> with few threads that handle large message pools. This also raises the
> question of how to implement the upper-layer callbacks (that would perform
> the synchronization-related actions) without spawning new threads.
>
> Regards,
> Vlad
>
>
> On Fri, Jul 11, 2014 at 11:40 PM, kishore g <g.kishore@gmail.com> wrote:
>
>> Good point Vlad, I was thinking of defining the right abstraction and
>> possibly provide one implementation based. I think the synchronous and
>> asynchronous messaging should be covered as part of this.
>>
>> Also I think Finagle and likes of it cater more towards the client server
>> communication but what we lack today is a good solution for peer to peer
>> transport. For example, if some one has to build a consensus layer they
>> have build everything from ground up. Providing a base layer that exchange
>> messages between peers on a per partition basis can be a great building
>> block to build different replication schemes.
>>
>> Overall messaging can be used for two cases
>> -- data and control.
>>
>>
>> CONTROL
>>
>> This is needed for control messages that occur rarely, for these type of
>> messages latency/throughput is not important but its important for it to be
>> reliable.
>>
>>
>> DATA
>>
>> This is mainly exchanging data between different roles,
>> controller-participant, participant-participant, spectator-participant.
>> These types of message exchanges occur quite frequently and having high
>> throughput and low latency is a requirement.
>>
>> For example having the following api and guarantee would allow one to
>> build synchronous replication, asynchronous, quorum etc.
>>
>> send(partition, message, ACK_MODE, callback) ACK_MODE can be like ACK
>> from ALL, QUORUM, 1, NONE etc. callback is fired when one gets the response
>> back from the receiver.
>>
>> This simple api can allow both synchronous and asynchronous mode of
>> communication. We dont have to do the serialization/deserialization. The
>> hard part here would be to guarantee ordering between the messages. One
>> should be able to specify the message ordering requirement, FIFO on a per
>> partition level or dont care about ordering. Having this makes it easy for
>> one to implement replication schemes.
>>
>>  thanks,
>> Kishore G
>>
>>
>> On Fri, Jul 11, 2014 at 12:49 PM, Greg Brandt <brandt.greg@gmail.com>
>> wrote:
>>
>>> Vlad,
>>>
>>> I'm not sure that the goal here is to cover all possible use cases. This
>>> is
>>> intended as basically a replacement for the current Helix cluster
>>> messaging
>>> service, which doesn't rely on ZooKeeper. I would argue that Helix is
>>> already that general framework for discovering the service endpoint, and
>>> that this is just one implementation of an underlying messaging stack
>>> (for
>>> which there is currently only the ZooKeeper-based implementation).
>>>
>>> Moreover, keeping it too general (i.e. providing the APIs outlined in the
>>> JIRA, but no implementation) puts much greater onus on the user. It would
>>> be nice for someone just starting out with Helix to have a pretty good
>>> messaging service readily available, as well as the APIs to implement
>>> more
>>> specific solutions if he or she so chooses.
>>>
>>> -Greg
>>>
>>>
>>> On Fri, Jul 11, 2014 at 11:07 AM, vlad.gm@gmail.com <vlad.gm@gmail.com>
>>> wrote:
>>>
>>> > This sounds like service discovery for a messaging solution. At this
>>> point
>>> > there would be some overlap with message buses such as Finagle. Perhaps
>>> > this should be a general framework for discovering the service endpoint
>>> > that could leave the actual underlaying messaging stack open (be it
>>> > Finagle, Netty-based or ZeroMQ, for example). My only fear is that if
>>> the
>>> > messaging framework is encapsulated completely into Helix, it would be
>>> hard
>>> > to compete with tuned messaging bus solutions and cover all possible
>>> use
>>> > cases (for example, in my company, we use a large number of sub cases
>>> on
>>> > the synchronous call to asynchronous call spectrum).
>>> >
>>> > Regards,
>>> > Vlad
>>> >
>>> >
>>> > On Fri, Jul 11, 2014 at 10:56 AM, Greg Brandt <brandt.greg@gmail.com>
>>> > wrote:
>>> >
>>> >> (copied from HELIX-470)
>>> >>
>>> >> Helix is missing a high-performance way to exchange messages among
>>> >> resource partitions, with a user-friendly API.
>>> >>
>>> >> Currently, the Helix messaging service relies on creating many nodes
>>> in
>>> >> ZooKeeper, which can lead to ZooKeeper outages if messages are sent
>>> too
>>> >> frequently.
>>> >>
>>> >> In order to avoid this, high-performance NIO-based HelixActors should
>>> be
>>> >> implemented (in rough accordance with the actor model). HelixActors
>>> exchange
>>> >> messages asynchronously without waiting for a response, and are
>>> >> partition/state-addressable.
>>> >>
>>> >> The API would look something like this:
>>> >>
>>> >> public interface HelixActor<T> {
>>> >>     void send(Partition partition, String state, T message);
>>> >>     void register(String resource, HelixActorCallback<T> callback);
>>> >> }
>>> >> public interface HelixActorCallback<T> {
>>> >>     void onMessage(Partition partition, State state, T message);
>>> >> }
>>> >>
>>> >> #send should likely support wildcards for partition number and state,
>>> or
>>> >> its method signature might need to be massaged a little bit for more
>>> >> flexibility. But that's the basic idea.
>>> >>
>>> >> Nothing is inferred about the format of the messages - the only
>>> metadata
>>> >> we need to be able to interpret is (1) partition name and (2) state.
>>> The
>>> >> user provides a codec to encode / decode messages, so it's nicer to
>>> >> implementHelixActor#send and HelixActorCallback#onMessage.
>>> >>
>>> >> public interface HelixActorMessageCodec<T> {
>>> >>     byte[] encode(T message);
>>> >>     T decode(byte[] message);
>>> >> }
>>> >>
>>> >> Actors should support somewhere around 100k to 1M messages per second.
>>> >> The Netty framework is a potential implementation candidate, but
>>> should be
>>> >> thoroughly evaluated w.r.t. performance.
>>> >>
>>> >
>>> >
>>>
>>
>>
>

Mime
View raw message