helix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "vlad.gm@gmail.com" <vlad...@gmail.com>
Subject Re: Add performant IPC (Helix actors)
Date Thu, 21 Aug 2014 01:00:58 GMT
Hi Greg,

Within Turn, a few of us were looking at Helix as a possible solution for
service discovery. It seems that the new system would keep the address
resolution of the old messaging system but replace ZooKeeper with a direct
connection for high throughput. Incidentally, we have a similar system, so
we are more interested in the addressing part.
One thing that came up within our discussion was the possibility of
accessing services in other datacenters. Are there plans to offer
inter-datacenter discovery capabilities? Would that be based on
ZK-observers or some other import mechanism (someone would need to connect
to the remote ZK, preferably not the client).
Also, especially in the across-datacenter case, there are clear parallels
to using DNS for the service discovery (I think Netflix published a blog
post where they detailed this as an option). What would we gain and lose by
using ZK as the building block as opposed to DNS? One thing that comes to
mind is that ZK-based service discovery in Helix can be configured as a
push-from-ZK or a poll-by-client service, while DNS entries timeout and
must always be polled again.

Regards,
Vlad


On Wed, Aug 20, 2014 at 5:46 PM, Greg Brandt <brandt.greg@gmail.com> wrote:

> Hey Henry,
>
> Initially, I thought the same thing. But after evaluating Akka Actors, I
> think they're a fundamentally different approach to developing distributed
> systems as opposed to simply a messaging layer implementation. Let me
> explain...
>
> The primary difference between Akka Actors and Helix is that the former
> prefers loose, decentralized control, whereas the latter prefers strong,
> centralized control. Akka applications are expected to manage state by
> themselves, whereas in Helix, that responsibility is delegated to the
> controller. And the actual user Akka Actor implementations are analogous to
> Helix Participant implementations.
>
> So with that perspective, it would be a bit of a kludge to leverage Akka
> Actors for simple cluster-scope-addressable message passing. Actually,
> Helix already provides this messaging feature, but it is done via ZooKeeper
> which is dangerous and less-than-performant for a high volume of messages.
>
> The right thing to do seems to be to provide a set of better transport
> implemenations (e.g. Netty, ActiveMQ, Kafka, etc.) for the
> cluster-scope-addressable messaging system, and better preserve the
> fundamental concepts in Helix.
>
> Let me know what you think.
>
> -Greg
>
>
>
> On Wed, Aug 20, 2014 at 9:55 AM, Henry Saputra <henry.saputra@gmail.com>
> wrote:
>
>> This seems fitting for Akka actor system [1]. Maybe we could just use
>> it as base?
>>
>>
>> - Henry
>>
>> [1] http://akka.io
>>
>> On Mon, Jul 21, 2014 at 9:47 AM, Greg Brandt <brandt.greg@gmail.com>
>> wrote:
>> > We talked about this in a little bit greater detail, and it's seeming
>> like
>> > there are three basic components that we want:
>> >
>> > 1. Transport - physically moving messages
>> > 2. Resolution - map cluster scope to set of machines
>> > 3. Management - message state, errors, retries, etc.
>> >
>> > The transport interface should be generic enough to allow many
>> > implementations (e.g. Netty, Kafka, etc.) It may be valuable to do
>> multiple
>> > implementations if one underlying system supports desired message
>> delivery
>> > guarantees better than another. For example, Kafka would be more
>> amenable
>> > to at-least-once delivery, whereas something like Netty, with much less
>> > overhead than Kafka, would be better for at-most-once delivery.
>> >
>> > The goal here I think is to provide something that's most similar to
>> Akka
>> > Actors, with the following guarantees:
>> >
>> > 1. at-most-once delivery
>> > 2. message ordering per sender-receiver pair
>> >
>> > A light-weight messaging primitive with those guarantees would allow
>> > implementation of many interesting things like Kishore mentions. Netty
>> > seems most suited to this task (it is the underlying transport for Akka
>> as
>> > well). The reason to avoid Akka I think is that it imposes too much of
>> its
>> > own philosophy on the user, and is really bloated. A Netty-based
>> primitive
>> > could be provided out-of-the-box, and the interfaces it implements
>> should
>> > be easy enough for users to implement on their own. It would be
>> relatively
>> > trivial to add a Kafka-based implementation out-of-the-box as well.
>> >
>> > (Have a bit of a prototype here:
>> >
>> https://github.com/brandtg/helix-actors/blob/master/helix-actors/src/main/java/org/apache/helix/actor/netty/NettyHelixActor.java
>> > )
>> >
>> > The transport mechanism should be agnostic to anything going on in the
>> > cluster: the resolution module is totally separate, and maps cluster
>> scope
>> > to sets of nodes. For example, one could send a message to the entire
>> > cluster, specific partitions of a resource, or even from one cluster to
>> > another (via composing resolution modules or something).
>> >
>> > Then there would be a management component, composed with the other two
>> > components, which manages user callbacks associated with varying cluster
>> > scopes, tracks messages based on ID in order to perform request /
>> response
>> > or feedback-loop kinds of things, manages errors / timeouts / retries,
>> etc.
>> >
>> > -Greg
>> >
>> >
>> >
>> > On Sat, Jul 12, 2014 at 11:27 AM, kishore g <g.kishore@gmail.com>
>> wrote:
>> >
>> >> Hi Vlad,
>> >>
>> >> Yes the idea is to have a pluggable architecture where Helix provides
>> the
>> >> messaging channel without serializing/deserializing the data. In the
>> end, I
>> >> envision having most of the building blocks needed to build a
>> distributed
>> >> system. One should be able to build a datastore/search/olap/pu
>> sub/stream
>> >> processing systems easily on top of Helix.
>> >>
>> >> I am curious to see what kind of tuning you guys did to handle 1.5
>> QPS. Our
>> >> goal is to have 1M QPS between the two servers. In order to achieve
>> this we
>> >> might need pipelining/batching along with ordering guarantee's. This
>> is non
>> >> trivial and error prone, I see a huge value in providing this feature.
>> Most
>> >> systems have built some form replication schemes on their own. There
>> are
>> >> two parts in the replication, one is the high through put message
>> exchange
>> >> and the other is the scheme that is synchronous replication/async
>> >> replication/chained replication/consensus etc. The first part is the
>> common
>> >> part that is needed across all the replication scheme.
>> >>
>> >> Of course getting this right is not easy and design is tricky but I
>> feel
>> >> its worth a try.
>> >>
>> >> This is the high level flow I had discussed with Greg.
>> >>
>> >> Each server can host P partitions and is listening at one port. Based
>> on
>> >> how many execution threads are needed, we can create C channels. Every
>> >> partition maps to a specific channel. On each channel we can guarantee
>> >> message ordering. We simply provide a callback to handle each message
>> and
>> >> pass the received bytes without deserializing. Its up to the callback
>> to
>> >> either process it synchronously or asynchronously.
>> >>
>> >> There are more details that its difficult to go over in the email. We
>> >> should probably write up the design and then deep dive on the details.
>> >>
>> >> thanks,
>> >> Kishore G
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Sat, Jul 12, 2014 at 8:08 AM, vlad.gm@gmail.com <vlad.gm@gmail.com>
>> >> wrote:
>> >>
>> >> > I agree that having a messaging scheme that is not ZooKeeper-reliant
>> >> would
>> >> > help in building some synchronization protocols between clients and
>> >> > replicas. I am thinking of cases such as writing inside Kafka to
>> >> replicated
>> >> > partitions our our own application,a replicated Key/Value store.
>> >> >
>> >> > For me, service discovery bridges the gap between an application
>> wanting
>> >> > to address a service (in Helix's case more particularly, a
>> >> service/resource
>> >> > name/partition tuple) and finding out a list of hostnames that can
>> >> receive
>> >> > that request. After that, whether the application wants to use Helix
>> >> itself
>> >> > or another messaging stack is an open problem, perhaps a clear
>> separation
>> >> > of the discovery/messaging layers would allow for quickly swapping
>> >> > different pieces here.
>> >> >
>> >> > The interesting part (in my view) in providing fast messaging as
>> part of
>> >> > Helix is the ability to use the service topology information that
>> Helix
>> >> > hosts (number of replicas and their placement) together with the
>> >> messaging
>> >> > layer as sufficient components for realizing a consensus algorithm
>> >> between
>> >> > replicas, that is providing, on top of the regular service discovery
>> >> > function of writing to one instance of a service, the function of
>> writing
>> >> > with consensus verification to all replicas of a service.
>> >> >
>> >> > In terms of capabilities, looking at Kafka and KV/Store as use cases,
>> >> this
>> >> > would mean running a whole protocol between the replicas of the
>> service
>> >> to
>> >> > achieve a certain synchronization outcome (be it Paxos, 2 Phase
>> Commit or
>> >> > Chain Replication - Paxos would probably not be needed since Helix
>> >> already
>> >> > provides a resilient master for the partition, so there is a clear
>> >> > hierarchy). Kafka suggests even more cases if we take the interaction
>> >> > between client and service into account, namely specifying the call
>> to
>> >> > return after the master gets the data or after consensus between
>> master
>> >> and
>> >> > replicas have been achieved, or sending data asynchronously (the
>> >> > combination of the producer.type and request.required.acks
>> parameters of
>> >> > the producer).
>> >> >
>> >> > The only reason for which I would like to see a pluggable
>> architecture is
>> >> > that in high volume applications the messaging stack requires a lot
>> of
>> >> > tuning. As I said at the meet-up, in our company we deal with about
>> 1.5M
>> >> > QPS, so having serialization/deserialization overheads per message,
>> or
>> >> > event separate listening/executor threads can be resource intensive.
>> >> > Whenever we can, we are tempted to use a stage-event drive
>> architecture,
>> >> > with few threads that handle large message pools. This also raises
>> the
>> >> > question of how to implement the upper-layer callbacks (that would
>> >> perform
>> >> > the synchronization-related actions) without spawning new threads.
>> >> >
>> >> > Regards,
>> >> > Vlad
>> >> >
>> >> >
>> >> > On Fri, Jul 11, 2014 at 11:40 PM, kishore g <g.kishore@gmail.com>
>> wrote:
>> >> >
>> >> >> Good point Vlad, I was thinking of defining the right abstraction
>> and
>> >> >> possibly provide one implementation based. I think the synchronous
>> and
>> >> >> asynchronous messaging should be covered as part of this.
>> >> >>
>> >> >> Also I think Finagle and likes of it cater more towards the client
>> >> server
>> >> >> communication but what we lack today is a good solution for peer
to
>> peer
>> >> >> transport. For example, if some one has to build a consensus layer
>> they
>> >> >> have build everything from ground up. Providing a base layer that
>> >> exchange
>> >> >> messages between peers on a per partition basis can be a great
>> building
>> >> >> block to build different replication schemes.
>> >> >>
>> >> >> Overall messaging can be used for two cases
>> >> >> -- data and control.
>> >> >>
>> >> >>
>> >> >> CONTROL
>> >> >>
>> >> >> This is needed for control messages that occur rarely, for these
>> type of
>> >> >> messages latency/throughput is not important but its important
for
>> it
>> >> to be
>> >> >> reliable.
>> >> >>
>> >> >>
>> >> >> DATA
>> >> >>
>> >> >> This is mainly exchanging data between different roles,
>> >> >> controller-participant, participant-participant,
>> spectator-participant.
>> >> >> These types of message exchanges occur quite frequently and having
>> high
>> >> >> throughput and low latency is a requirement.
>> >> >>
>> >> >> For example having the following api and guarantee would allow
one
>> to
>> >> >> build synchronous replication, asynchronous, quorum etc.
>> >> >>
>> >> >> send(partition, message, ACK_MODE, callback) ACK_MODE can be like
>> ACK
>> >> >> from ALL, QUORUM, 1, NONE etc. callback is fired when one gets
the
>> >> response
>> >> >> back from the receiver.
>> >> >>
>> >> >> This simple api can allow both synchronous and asynchronous mode
of
>> >> >> communication. We dont have to do the
>> serialization/deserialization. The
>> >> >> hard part here would be to guarantee ordering between the messages.
>> One
>> >> >> should be able to specify the message ordering requirement, FIFO
on
>> a
>> >> per
>> >> >> partition level or dont care about ordering. Having this makes
it
>> easy
>> >> for
>> >> >> one to implement replication schemes.
>> >> >>
>> >> >>  thanks,
>> >> >> Kishore G
>> >> >>
>> >> >>
>> >> >> On Fri, Jul 11, 2014 at 12:49 PM, Greg Brandt <
>> brandt.greg@gmail.com>
>> >> >> wrote:
>> >> >>
>> >> >>> Vlad,
>> >> >>>
>> >> >>> I'm not sure that the goal here is to cover all possible use
cases.
>> >> This
>> >> >>> is
>> >> >>> intended as basically a replacement for the current Helix cluster
>> >> >>> messaging
>> >> >>> service, which doesn't rely on ZooKeeper. I would argue that
Helix
>> is
>> >> >>> already that general framework for discovering the service
>> endpoint,
>> >> and
>> >> >>> that this is just one implementation of an underlying messaging
>> stack
>> >> >>> (for
>> >> >>> which there is currently only the ZooKeeper-based implementation).
>> >> >>>
>> >> >>> Moreover, keeping it too general (i.e. providing the APIs outlined
>> in
>> >> the
>> >> >>> JIRA, but no implementation) puts much greater onus on the
user. It
>> >> would
>> >> >>> be nice for someone just starting out with Helix to have a
pretty
>> good
>> >> >>> messaging service readily available, as well as the APIs to
>> implement
>> >> >>> more
>> >> >>> specific solutions if he or she so chooses.
>> >> >>>
>> >> >>> -Greg
>> >> >>>
>> >> >>>
>> >> >>> On Fri, Jul 11, 2014 at 11:07 AM, vlad.gm@gmail.com <
>> vlad.gm@gmail.com
>> >> >
>> >> >>> wrote:
>> >> >>>
>> >> >>> > This sounds like service discovery for a messaging solution.
At
>> this
>> >> >>> point
>> >> >>> > there would be some overlap with message buses such as
Finagle.
>> >> Perhaps
>> >> >>> > this should be a general framework for discovering the
service
>> >> endpoint
>> >> >>> > that could leave the actual underlaying messaging stack
open (be
>> it
>> >> >>> > Finagle, Netty-based or ZeroMQ, for example). My only
fear is
>> that if
>> >> >>> the
>> >> >>> > messaging framework is encapsulated completely into Helix,
it
>> would
>> >> be
>> >> >>> hard
>> >> >>> > to compete with tuned messaging bus solutions and cover
all
>> possible
>> >> >>> use
>> >> >>> > cases (for example, in my company, we use a large number
of sub
>> cases
>> >> >>> on
>> >> >>> > the synchronous call to asynchronous call spectrum).
>> >> >>> >
>> >> >>> > Regards,
>> >> >>> > Vlad
>> >> >>> >
>> >> >>> >
>> >> >>> > On Fri, Jul 11, 2014 at 10:56 AM, Greg Brandt <
>> brandt.greg@gmail.com
>> >> >
>> >> >>> > wrote:
>> >> >>> >
>> >> >>> >> (copied from HELIX-470)
>> >> >>> >>
>> >> >>> >> Helix is missing a high-performance way to exchange
messages
>> among
>> >> >>> >> resource partitions, with a user-friendly API.
>> >> >>> >>
>> >> >>> >> Currently, the Helix messaging service relies on creating
many
>> nodes
>> >> >>> in
>> >> >>> >> ZooKeeper, which can lead to ZooKeeper outages if
messages are
>> sent
>> >> >>> too
>> >> >>> >> frequently.
>> >> >>> >>
>> >> >>> >> In order to avoid this, high-performance NIO-based
HelixActors
>> >> should
>> >> >>> be
>> >> >>> >> implemented (in rough accordance with the actor model).
>> HelixActors
>> >> >>> exchange
>> >> >>> >> messages asynchronously without waiting for a response,
and are
>> >> >>> >> partition/state-addressable.
>> >> >>> >>
>> >> >>> >> The API would look something like this:
>> >> >>> >>
>> >> >>> >> public interface HelixActor<T> {
>> >> >>> >>     void send(Partition partition, String state, T
message);
>> >> >>> >>     void register(String resource, HelixActorCallback<T>
>> callback);
>> >> >>> >> }
>> >> >>> >> public interface HelixActorCallback<T> {
>> >> >>> >>     void onMessage(Partition partition, State state,
T message);
>> >> >>> >> }
>> >> >>> >>
>> >> >>> >> #send should likely support wildcards for partition
number and
>> >> state,
>> >> >>> or
>> >> >>> >> its method signature might need to be massaged a little
bit for
>> more
>> >> >>> >> flexibility. But that's the basic idea.
>> >> >>> >>
>> >> >>> >> Nothing is inferred about the format of the messages
- the only
>> >> >>> metadata
>> >> >>> >> we need to be able to interpret is (1) partition name
and (2)
>> state.
>> >> >>> The
>> >> >>> >> user provides a codec to encode / decode messages,
so it's
>> nicer to
>> >> >>> >> implementHelixActor#send and HelixActorCallback#onMessage.
>> >> >>> >>
>> >> >>> >> public interface HelixActorMessageCodec<T> {
>> >> >>> >>     byte[] encode(T message);
>> >> >>> >>     T decode(byte[] message);
>> >> >>> >> }
>> >> >>> >>
>> >> >>> >> Actors should support somewhere around 100k to 1M
messages per
>> >> second.
>> >> >>> >> The Netty framework is a potential implementation
candidate, but
>> >> >>> should be
>> >> >>> >> thoroughly evaluated w.r.t. performance.
>> >> >>> >>
>> >> >>> >
>> >> >>> >
>> >> >>>
>> >> >>
>> >> >>
>> >> >
>> >>
>>
>
>

Mime
View raw message