helix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kishore g <g.kish...@gmail.com>
Subject Re: Add performant IPC (Helix actors)
Date Wed, 23 Jul 2014 17:17:01 GMT
Thanks Greg, this looks great. I see how you have interfaces such that we
can add other impls such as zeromq etc. Netty impl looks good as a starting
point.

Couple of things

-- HelixIPC seems to templatized on T Message. It also contains
start/stop/register methods. This means we will have to open one socket per
Message type which may not be a good idea.
-- It might be better to separate HelixIPC into HelixIPCService and
HelixIPC. Service will have the start/stop and one can get a concrete impl
of HelixIPC<MessageType> from the service.
-- I am not sure where should callback be, at this layer I think we will
have only one callback that we provide that simply dispatches (async) the
message to appropriate user callback
-- MessageFormat might need additional fields like
----- version of the message format, allows us to enhance the message
format latter
----- message type field, helix might have some message types that the app
users may not care about. Allows us to intercept control messages.

thanks
Kishore G





On Mon, Jul 21, 2014 at 9:47 AM, Greg Brandt <brandt.greg@gmail.com> wrote:

> We talked about this in a little bit greater detail, and it's seeming like
> there are three basic components that we want:
>
> 1. Transport - physically moving messages
> 2. Resolution - map cluster scope to set of machines
> 3. Management - message state, errors, retries, etc.
>
> The transport interface should be generic enough to allow many
> implementations (e.g. Netty, Kafka, etc.) It may be valuable to do multiple
> implementations if one underlying system supports desired message delivery
> guarantees better than another. For example, Kafka would be more amenable
> to at-least-once delivery, whereas something like Netty, with much less
> overhead than Kafka, would be better for at-most-once delivery.
>
> The goal here I think is to provide something that's most similar to Akka
> Actors, with the following guarantees:
>
> 1. at-most-once delivery
> 2. message ordering per sender-receiver pair
>
> A light-weight messaging primitive with those guarantees would allow
> implementation of many interesting things like Kishore mentions. Netty
> seems most suited to this task (it is the underlying transport for Akka as
> well). The reason to avoid Akka I think is that it imposes too much of its
> own philosophy on the user, and is really bloated. A Netty-based primitive
> could be provided out-of-the-box, and the interfaces it implements should
> be easy enough for users to implement on their own. It would be relatively
> trivial to add a Kafka-based implementation out-of-the-box as well.
>
> (Have a bit of a prototype here:
> https://github.com/brandtg/helix-actors/blob/master/helix-actors/src/main/java/org/apache/helix/actor/netty/NettyHelixActor.java
> )
>
> The transport mechanism should be agnostic to anything going on in the
> cluster: the resolution module is totally separate, and maps cluster scope
> to sets of nodes. For example, one could send a message to the entire
> cluster, specific partitions of a resource, or even from one cluster to
> another (via composing resolution modules or something).
>
> Then there would be a management component, composed with the other two
> components, which manages user callbacks associated with varying cluster
> scopes, tracks messages based on ID in order to perform request / response
> or feedback-loop kinds of things, manages errors / timeouts / retries, etc.
>
> -Greg
>
>
>
> On Sat, Jul 12, 2014 at 11:27 AM, kishore g <g.kishore@gmail.com> wrote:
>
>> Hi Vlad,
>>
>> Yes the idea is to have a pluggable architecture where Helix provides the
>> messaging channel without serializing/deserializing the data. In the end,
>> I
>> envision having most of the building blocks needed to build a distributed
>> system. One should be able to build a datastore/search/olap/pu sub/stream
>> processing systems easily on top of Helix.
>>
>> I am curious to see what kind of tuning you guys did to handle 1.5 QPS.
>> Our
>> goal is to have 1M QPS between the two servers. In order to achieve this
>> we
>> might need pipelining/batching along with ordering guarantee's. This is
>> non
>> trivial and error prone, I see a huge value in providing this feature.
>> Most
>> systems have built some form replication schemes on their own. There are
>> two parts in the replication, one is the high through put message exchange
>> and the other is the scheme that is synchronous replication/async
>> replication/chained replication/consensus etc. The first part is the
>> common
>> part that is needed across all the replication scheme.
>>
>> Of course getting this right is not easy and design is tricky but I feel
>> its worth a try.
>>
>> This is the high level flow I had discussed with Greg.
>>
>> Each server can host P partitions and is listening at one port. Based on
>> how many execution threads are needed, we can create C channels. Every
>> partition maps to a specific channel. On each channel we can guarantee
>> message ordering. We simply provide a callback to handle each message and
>> pass the received bytes without deserializing. Its up to the callback to
>> either process it synchronously or asynchronously.
>>
>> There are more details that its difficult to go over in the email. We
>> should probably write up the design and then deep dive on the details.
>>
>> thanks,
>> Kishore G
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sat, Jul 12, 2014 at 8:08 AM, vlad.gm@gmail.com <vlad.gm@gmail.com>
>> wrote:
>>
>> > I agree that having a messaging scheme that is not ZooKeeper-reliant
>> would
>> > help in building some synchronization protocols between clients and
>> > replicas. I am thinking of cases such as writing inside Kafka to
>> replicated
>> > partitions our our own application,a replicated Key/Value store.
>> >
>> > For me, service discovery bridges the gap between an application wanting
>> > to address a service (in Helix's case more particularly, a
>> service/resource
>> > name/partition tuple) and finding out a list of hostnames that can
>> receive
>> > that request. After that, whether the application wants to use Helix
>> itself
>> > or another messaging stack is an open problem, perhaps a clear
>> separation
>> > of the discovery/messaging layers would allow for quickly swapping
>> > different pieces here.
>> >
>> > The interesting part (in my view) in providing fast messaging as part of
>> > Helix is the ability to use the service topology information that Helix
>> > hosts (number of replicas and their placement) together with the
>> messaging
>> > layer as sufficient components for realizing a consensus algorithm
>> between
>> > replicas, that is providing, on top of the regular service discovery
>> > function of writing to one instance of a service, the function of
>> writing
>> > with consensus verification to all replicas of a service.
>> >
>> > In terms of capabilities, looking at Kafka and KV/Store as use cases,
>> this
>> > would mean running a whole protocol between the replicas of the service
>> to
>> > achieve a certain synchronization outcome (be it Paxos, 2 Phase Commit
>> or
>> > Chain Replication - Paxos would probably not be needed since Helix
>> already
>> > provides a resilient master for the partition, so there is a clear
>> > hierarchy). Kafka suggests even more cases if we take the interaction
>> > between client and service into account, namely specifying the call to
>> > return after the master gets the data or after consensus between master
>> and
>> > replicas have been achieved, or sending data asynchronously (the
>> > combination of the producer.type and request.required.acks parameters of
>> > the producer).
>> >
>> > The only reason for which I would like to see a pluggable architecture
>> is
>> > that in high volume applications the messaging stack requires a lot of
>> > tuning. As I said at the meet-up, in our company we deal with about 1.5M
>> > QPS, so having serialization/deserialization overheads per message, or
>> > event separate listening/executor threads can be resource intensive.
>> > Whenever we can, we are tempted to use a stage-event drive architecture,
>> > with few threads that handle large message pools. This also raises the
>> > question of how to implement the upper-layer callbacks (that would
>> perform
>> > the synchronization-related actions) without spawning new threads.
>> >
>> > Regards,
>> > Vlad
>> >
>> >
>> > On Fri, Jul 11, 2014 at 11:40 PM, kishore g <g.kishore@gmail.com>
>> wrote:
>> >
>> >> Good point Vlad, I was thinking of defining the right abstraction and
>> >> possibly provide one implementation based. I think the synchronous and
>> >> asynchronous messaging should be covered as part of this.
>> >>
>> >> Also I think Finagle and likes of it cater more towards the client
>> server
>> >> communication but what we lack today is a good solution for peer to
>> peer
>> >> transport. For example, if some one has to build a consensus layer they
>> >> have build everything from ground up. Providing a base layer that
>> exchange
>> >> messages between peers on a per partition basis can be a great building
>> >> block to build different replication schemes.
>> >>
>> >> Overall messaging can be used for two cases
>> >> -- data and control.
>> >>
>> >>
>> >> CONTROL
>> >>
>> >> This is needed for control messages that occur rarely, for these type
>> of
>> >> messages latency/throughput is not important but its important for it
>> to be
>> >> reliable.
>> >>
>> >>
>> >> DATA
>> >>
>> >> This is mainly exchanging data between different roles,
>> >> controller-participant, participant-participant, spectator-participant.
>> >> These types of message exchanges occur quite frequently and having high
>> >> throughput and low latency is a requirement.
>> >>
>> >> For example having the following api and guarantee would allow one to
>> >> build synchronous replication, asynchronous, quorum etc.
>> >>
>> >> send(partition, message, ACK_MODE, callback) ACK_MODE can be like ACK
>> >> from ALL, QUORUM, 1, NONE etc. callback is fired when one gets the
>> response
>> >> back from the receiver.
>> >>
>> >> This simple api can allow both synchronous and asynchronous mode of
>> >> communication. We dont have to do the serialization/deserialization.
>> The
>> >> hard part here would be to guarantee ordering between the messages. One
>> >> should be able to specify the message ordering requirement, FIFO on a
>> per
>> >> partition level or dont care about ordering. Having this makes it easy
>> for
>> >> one to implement replication schemes.
>> >>
>> >>  thanks,
>> >> Kishore G
>> >>
>> >>
>> >> On Fri, Jul 11, 2014 at 12:49 PM, Greg Brandt <brandt.greg@gmail.com>
>> >> wrote:
>> >>
>> >>> Vlad,
>> >>>
>> >>> I'm not sure that the goal here is to cover all possible use cases.
>> This
>> >>> is
>> >>> intended as basically a replacement for the current Helix cluster
>> >>> messaging
>> >>> service, which doesn't rely on ZooKeeper. I would argue that Helix is
>> >>> already that general framework for discovering the service endpoint,
>> and
>> >>> that this is just one implementation of an underlying messaging stack
>> >>> (for
>> >>> which there is currently only the ZooKeeper-based implementation).
>> >>>
>> >>> Moreover, keeping it too general (i.e. providing the APIs outlined in
>> the
>> >>> JIRA, but no implementation) puts much greater onus on the user. It
>> would
>> >>> be nice for someone just starting out with Helix to have a pretty good
>> >>> messaging service readily available, as well as the APIs to implement
>> >>> more
>> >>> specific solutions if he or she so chooses.
>> >>>
>> >>> -Greg
>> >>>
>> >>>
>> >>> On Fri, Jul 11, 2014 at 11:07 AM, vlad.gm@gmail.com <
>> vlad.gm@gmail.com>
>> >>> wrote:
>> >>>
>> >>> > This sounds like service discovery for a messaging solution. At
this
>> >>> point
>> >>> > there would be some overlap with message buses such as Finagle.
>> Perhaps
>> >>> > this should be a general framework for discovering the service
>> endpoint
>> >>> > that could leave the actual underlaying messaging stack open (be
it
>> >>> > Finagle, Netty-based or ZeroMQ, for example). My only fear is that
>> if
>> >>> the
>> >>> > messaging framework is encapsulated completely into Helix, it would
>> be
>> >>> hard
>> >>> > to compete with tuned messaging bus solutions and cover all possible
>> >>> use
>> >>> > cases (for example, in my company, we use a large number of sub
>> cases
>> >>> on
>> >>> > the synchronous call to asynchronous call spectrum).
>> >>> >
>> >>> > Regards,
>> >>> > Vlad
>> >>> >
>> >>> >
>> >>> > On Fri, Jul 11, 2014 at 10:56 AM, Greg Brandt <
>> brandt.greg@gmail.com>
>> >>> > wrote:
>> >>> >
>> >>> >> (copied from HELIX-470)
>> >>> >>
>> >>> >> Helix is missing a high-performance way to exchange messages
among
>> >>> >> resource partitions, with a user-friendly API.
>> >>> >>
>> >>> >> Currently, the Helix messaging service relies on creating many
>> nodes
>> >>> in
>> >>> >> ZooKeeper, which can lead to ZooKeeper outages if messages
are sent
>> >>> too
>> >>> >> frequently.
>> >>> >>
>> >>> >> In order to avoid this, high-performance NIO-based HelixActors
>> should
>> >>> be
>> >>> >> implemented (in rough accordance with the actor model). HelixActors
>> >>> exchange
>> >>> >> messages asynchronously without waiting for a response, and
are
>> >>> >> partition/state-addressable.
>> >>> >>
>> >>> >> The API would look something like this:
>> >>> >>
>> >>> >> public interface HelixActor<T> {
>> >>> >>     void send(Partition partition, String state, T message);
>> >>> >>     void register(String resource, HelixActorCallback<T>
callback);
>> >>> >> }
>> >>> >> public interface HelixActorCallback<T> {
>> >>> >>     void onMessage(Partition partition, State state, T message);
>> >>> >> }
>> >>> >>
>> >>> >> #send should likely support wildcards for partition number
and
>> state,
>> >>> or
>> >>> >> its method signature might need to be massaged a little bit
for
>> more
>> >>> >> flexibility. But that's the basic idea.
>> >>> >>
>> >>> >> Nothing is inferred about the format of the messages - the
only
>> >>> metadata
>> >>> >> we need to be able to interpret is (1) partition name and (2)
>> state.
>> >>> The
>> >>> >> user provides a codec to encode / decode messages, so it's
nicer to
>> >>> >> implementHelixActor#send and HelixActorCallback#onMessage.
>> >>> >>
>> >>> >> public interface HelixActorMessageCodec<T> {
>> >>> >>     byte[] encode(T message);
>> >>> >>     T decode(byte[] message);
>> >>> >> }
>> >>> >>
>> >>> >> Actors should support somewhere around 100k to 1M messages
per
>> second.
>> >>> >> The Netty framework is a potential implementation candidate,
but
>> >>> should be
>> >>> >> thoroughly evaluated w.r.t. performance.
>> >>> >>
>> >>> >
>> >>> >
>> >>>
>> >>
>> >>
>> >
>>
>
>

Mime
View raw message