helix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kishore g <g.kish...@gmail.com>
Subject Re: Add performant IPC (Helix actors)
Date Thu, 21 Aug 2014 21:07:35 GMT
Should we move the ZK cross data center to a separate thread. This is an
interesting problem and probably needs more detailed discussions.

What do you guys think.


On Wed, Aug 20, 2014 at 7:44 PM, Kanak Biscuitwala <kanak.b@hotmail.com>
wrote:

> Hey Vlad,
>
> In theory, we should be able to plug in a resolver that can resolve ZKs in
> other datacenters. We already resolve different clusters within the same
> ZK. Here is the resolver interface:
>
>
> https://github.com/brandtg/helix-actors/blob/master/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
>
> Right now the ZK implementation of the resolver can return a different
> Helix manager for different clusters, and perhaps it can be adapted to also
> accept a different ZK address from a different DC. Then we can connect to
> ZK observers. However, the message scope probably needs an additional
> resolver-specific metadata field or something that we can use to store the
> ZK observer addresses (and therefore a corresponding field in a message).
>
> Regarding ZK vs DNS, push updates via watchers is an advantage as you
> pointed out. Also since Helix persists state on ZK, you have access to the
> entire cluster metadata in the context of your requests. This can certainly
> be implemented in a DNS-like scheme, though. Perhaps others can chime in on
> the tradeoffs here.
>
> Kanak
>
> ------------------------------
> From: brandt.greg@gmail.com
> Date: Wed, 20 Aug 2014 18:16:55 -0700
> Subject: Re: Add performant IPC (Helix actors)
> To: user@helix.apache.org
> CC: dev@helix.apache.org
>
>
> Hey Vlad,
>
> Correct, ZooKeeper would still be used for the address resolution
> component.
>
> Cross-datacenter hasn't really been thought of yet, but you do make a good
> point in that we should address this early because it usually seems to be
> an afterthought. However, we are planning to allow cross-cluster messaging
> within the same ZooKeeper.
>
> Something that might be interesting to do in the cross-datacenter use case
> is to replicate each datacenter's ZooKeeper logs to each other datacenter
> in order to maintain a read-only view of that ZooKeeper cluster. One could
> then resolve addresses with only connections to local ZooKeeper(s), though
> the replication delay would have to be accounted for during state
> transitions.
>
> I've got a small project that does generic log change capture, and intend
> on doing ZooKeeper log replication next, actually - have MySQL binary log
> and HDFS namenode edit logs now (https://github.com/brandtg/switchboard).
> Just an idea.
>
> -Greg
>
>
> On Wed, Aug 20, 2014 at 6:00 PM, vlad.gm@gmail.com <vlad.gm@gmail.com>
> wrote:
>
>
> Hi Greg,
>
> Within Turn, a few of us were looking at Helix as a possible solution for
> service discovery. It seems that the new system would keep the address
> resolution of the old messaging system but replace ZooKeeper with a direct
> connection for high throughput. Incidentally, we have a similar system, so
> we are more interested in the addressing part.
> One thing that came up within our discussion was the possibility of
> accessing services in other datacenters. Are there plans to offer
> inter-datacenter discovery capabilities? Would that be based on
> ZK-observers or some other import mechanism (someone would need to connect
> to the remote ZK, preferably not the client).
> Also, especially in the across-datacenter case, there are clear parallels
> to using DNS for the service discovery (I think Netflix published a blog
> post where they detailed this as an option). What would we gain and lose by
> using ZK as the building block as opposed to DNS? One thing that comes to
> mind is that ZK-based service discovery in Helix can be configured as a
> push-from-ZK or a poll-by-client service, while DNS entries timeout and
> must always be polled again.
>
> Regards,
> Vlad
>
>
> On Wed, Aug 20, 2014 at 5:46 PM, Greg Brandt <brandt.greg@gmail.com>
> wrote:
>
> Hey Henry,
>
> Initially, I thought the same thing. But after evaluating Akka Actors, I
> think they're a fundamentally different approach to developing distributed
> systems as opposed to simply a messaging layer implementation. Let me
> explain...
>
> The primary difference between Akka Actors and Helix is that the former
> prefers loose, decentralized control, whereas the latter prefers strong,
> centralized control. Akka applications are expected to manage state by
> themselves, whereas in Helix, that responsibility is delegated to the
> controller. And the actual user Akka Actor implementations are analogous to
> Helix Participant implementations.
>
> So with that perspective, it would be a bit of a kludge to leverage Akka
> Actors for simple cluster-scope-addressable message passing. Actually,
> Helix already provides this messaging feature, but it is done via ZooKeeper
> which is dangerous and less-than-performant for a high volume of messages.
>
> The right thing to do seems to be to provide a set of better transport
> implemenations (e.g. Netty, ActiveMQ, Kafka, etc.) for the
> cluster-scope-addressable messaging system, and better preserve the
> fundamental concepts in Helix.
>
> Let me know what you think.
>
> -Greg
>
>
>
> On Wed, Aug 20, 2014 at 9:55 AM, Henry Saputra <henry.saputra@gmail.com>
> wrote:
>
> This seems fitting for Akka actor system [1]. Maybe we could just use
> it as base?
>
>
> - Henry
>
> [1] http://akka.io
>
> On Mon, Jul 21, 2014 at 9:47 AM, Greg Brandt <brandt.greg@gmail.com>
> wrote:
> > We talked about this in a little bit greater detail, and it's seeming
> like
> > there are three basic components that we want:
> >
> > 1. Transport - physically moving messages
> > 2. Resolution - map cluster scope to set of machines
> > 3. Management - message state, errors, retries, etc.
> >
> > The transport interface should be generic enough to allow many
> > implementations (e.g. Netty, Kafka, etc.) It may be valuable to do
> multiple
> > implementations if one underlying system supports desired message
> delivery
> > guarantees better than another. For example, Kafka would be more amenable
> > to at-least-once delivery, whereas something like Netty, with much less
> > overhead than Kafka, would be better for at-most-once delivery.
> >
> > The goal here I think is to provide something that's most similar to Akka
> > Actors, with the following guarantees:
> >
> > 1. at-most-once delivery
> > 2. message ordering per sender-receiver pair
> >
> > A light-weight messaging primitive with those guarantees would allow
> > implementation of many interesting things like Kishore mentions. Netty
> > seems most suited to this task (it is the underlying transport for Akka
> as
> > well). The reason to avoid Akka I think is that it imposes too much of
> its
> > own philosophy on the user, and is really bloated. A Netty-based
> primitive
> > could be provided out-of-the-box, and the interfaces it implements should
> > be easy enough for users to implement on their own. It would be
> relatively
> > trivial to add a Kafka-based implementation out-of-the-box as well.
> >
> > (Have a bit of a prototype here:
> >
> https://github.com/brandtg/helix-actors/blob/master/helix-actors/src/main/java/org/apache/helix/actor/netty/NettyHelixActor.java
> > )
> >
> > The transport mechanism should be agnostic to anything going on in the
> > cluster: the resolution module is totally separate, and maps cluster
> scope
> > to sets of nodes. For example, one could send a message to the entire
> > cluster, specific partitions of a resource, or even from one cluster to
> > another (via composing resolution modules or something).
> >
> > Then there would be a management component, composed with the other two
> > components, which manages user callbacks associated with varying cluster
> > scopes, tracks messages based on ID in order to perform request /
> response
> > or feedback-loop kinds of things, manages errors / timeouts / retries,
> etc.
> >
> > -Greg
> >
> >
> >
> > On Sat, Jul 12, 2014 at 11:27 AM, kishore g <g.kishore@gmail.com> wrote:
> >
> >> Hi Vlad,
> >>
> >> Yes the idea is to have a pluggable architecture where Helix provides
> the
> >> messaging channel without serializing/deserializing the data. In the
> end, I
> >> envision having most of the building blocks needed to build a
> distributed
> >> system. One should be able to build a datastore/search/olap/pu
> sub/stream
> >> processing systems easily on top of Helix.
> >>
> >> I am curious to see what kind of tuning you guys did to handle 1.5 QPS.
> Our
> >> goal is to have 1M QPS between the two servers. In order to achieve
> this we
> >> might need pipelining/batching along with ordering guarantee's. This is
> non
> >> trivial and error prone, I see a huge value in providing this feature.
> Most
> >> systems have built some form replication schemes on their own. There are
> >> two parts in the replication, one is the high through put message
> exchange
> >> and the other is the scheme that is synchronous replication/async
> >> replication/chained replication/consensus etc. The first part is the
> common
> >> part that is needed across all the replication scheme.
> >>
> >> Of course getting this right is not easy and design is tricky but I feel
> >> its worth a try.
> >>
> >> This is the high level flow I had discussed with Greg.
> >>
> >> Each server can host P partitions and is listening at one port. Based on
> >> how many execution threads are needed, we can create C channels. Every
> >> partition maps to a specific channel. On each channel we can guarantee
> >> message ordering. We simply provide a callback to handle each message
> and
> >> pass the received bytes without deserializing. Its up to the callback to
> >> either process it synchronously or asynchronously.
> >>
> >> There are more details that its difficult to go over in the email. We
> >> should probably write up the design and then deep dive on the details.
> >>
> >> thanks,
> >> Kishore G
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Sat, Jul 12, 2014 at 8:08 AM, vlad.gm@gmail.com <vlad.gm@gmail.com>
> >> wrote:
> >>
> >> > I agree that having a messaging scheme that is not ZooKeeper-reliant
> >> would
> >> > help in building some synchronization protocols between clients and
> >> > replicas. I am thinking of cases such as writing inside Kafka to
> >> replicated
> >> > partitions our our own application,a replicated Key/Value store.
> >> >
> >> > For me, service discovery bridges the gap between an application
> wanting
> >> > to address a service (in Helix's case more particularly, a
> >> service/resource
> >> > name/partition tuple) and finding out a list of hostnames that can
> >> receive
> >> > that request. After that, whether the application wants to use Helix
> >> itself
> >> > or another messaging stack is an open problem, perhaps a clear
> separation
> >> > of the discovery/messaging layers would allow for quickly swapping
> >> > different pieces here.
> >> >
> >> > The interesting part (in my view) in providing fast messaging as part
> of
> >> > Helix is the ability to use the service topology information that
> Helix
> >> > hosts (number of replicas and their placement) together with the
> >> messaging
> >> > layer as sufficient components for realizing a consensus algorithm
> >> between
> >> > replicas, that is providing, on top of the regular service discovery
> >> > function of writing to one instance of a service, the function of
> writing
> >> > with consensus verification to all replicas of a service.
> >> >
> >> > In terms of capabilities, looking at Kafka and KV/Store as use cases,
> >> this
> >> > would mean running a whole protocol between the replicas of the
> service
> >> to
> >> > achieve a certain synchronization outcome (be it Paxos, 2 Phase
> Commit or
> >> > Chain Replication - Paxos would probably not be needed since Helix
> >> already
> >> > provides a resilient master for the partition, so there is a clear
> >> > hierarchy). Kafka suggests even more cases if we take the interaction
> >> > between client and service into account, namely specifying the call to
> >> > return after the master gets the data or after consensus between
> master
> >> and
> >> > replicas have been achieved, or sending data asynchronously (the
> >> > combination of the producer.type and request.required.acks parameters
> of
> >> > the producer).
> >> >
> >> > The only reason for which I would like to see a pluggable
> architecture is
> >> > that in high volume applications the messaging stack requires a lot of
> >> > tuning. As I said at the meet-up, in our company we deal with about
> 1.5M
> >> > QPS, so having serialization/deserialization overheads per message, or
> >> > event separate listening/executor threads can be resource intensive.
> >> > Whenever we can, we are tempted to use a stage-event drive
> architecture,
> >> > with few threads that handle large message pools. This also raises the
> >> > question of how to implement the upper-layer callbacks (that would
> >> perform
> >> > the synchronization-related actions) without spawning new threads.
> >> >
> >> > Regards,
> >> > Vlad
> >> >
> >> >
> >> > On Fri, Jul 11, 2014 at 11:40 PM, kishore g <g.kishore@gmail.com>
> wrote:
> >> >
> >> >> Good point Vlad, I was thinking of defining the right abstraction and
> >> >> possibly provide one implementation based. I think the synchronous
> and
> >> >> asynchronous messaging should be covered as part of this.
> >> >>
> >> >> Also I think Finagle and likes of it cater more towards the client
> >> server
> >> >> communication but what we lack today is a good solution for peer to
> peer
> >> >> transport. For example, if some one has to build a consensus layer
> they
> >> >> have build everything from ground up. Providing a base layer that
> >> exchange
> >> >> messages between peers on a per partition basis can be a great
> building
> >> >> block to build different replication schemes.
> >> >>
> >> >> Overall messaging can be used for two cases
> >> >> -- data and control.
> >> >>
> >> >>
> >> >> CONTROL
> >> >>
> >> >> This is needed for control messages that occur rarely, for these
> type of
> >> >> messages latency/throughput is not important but its important for
it
> >> to be
> >> >> reliable.
> >> >>
> >> >>
> >> >> DATA
> >> >>
> >> >> This is mainly exchanging data between different roles,
> >> >> controller-participant, participant-participant,
> spectator-participant.
> >> >> These types of message exchanges occur quite frequently and having
> high
> >> >> throughput and low latency is a requirement.
> >> >>
> >> >> For example having the following api and guarantee would allow one
to
> >> >> build synchronous replication, asynchronous, quorum etc.
> >> >>
> >> >> send(partition, message, ACK_MODE, callback) ACK_MODE can be like ACK
> >> >> from ALL, QUORUM, 1, NONE etc. callback is fired when one gets the
> >> response
> >> >> back from the receiver.
> >> >>
> >> >> This simple api can allow both synchronous and asynchronous mode of
> >> >> communication. We dont have to do the serialization/deserialization.
> The
> >> >> hard part here would be to guarantee ordering between the messages.
> One
> >> >> should be able to specify the message ordering requirement, FIFO on
a
> >> per
> >> >> partition level or dont care about ordering. Having this makes it
> easy
> >> for
> >> >> one to implement replication schemes.
> >> >>
> >> >>  thanks,
> >> >> Kishore G
> >> >>
> >> >>
> >> >> On Fri, Jul 11, 2014 at 12:49 PM, Greg Brandt <brandt.greg@gmail.com
> >
> >> >> wrote:
> >> >>
> >> >>> Vlad,
> >> >>>
> >> >>> I'm not sure that the goal here is to cover all possible use cases.
> >> This
> >> >>> is
> >> >>> intended as basically a replacement for the current Helix cluster
> >> >>> messaging
> >> >>> service, which doesn't rely on ZooKeeper. I would argue that Helix
> is
> >> >>> already that general framework for discovering the service endpoint,
> >> and
> >> >>> that this is just one implementation of an underlying messaging
> stack
> >> >>> (for
> >> >>> which there is currently only the ZooKeeper-based implementation).
> >> >>>
> >> >>> Moreover, keeping it too general (i.e. providing the APIs outlined
> in
> >> the
> >> >>> JIRA, but no implementation) puts much greater onus on the user.
It
> >> would
> >> >>> be nice for someone just starting out with Helix to have a pretty
> good
> >> >>> messaging service readily available, as well as the APIs to
> implement
> >> >>> more
> >> >>> specific solutions if he or she so chooses.
> >> >>>
> >> >>> -Greg
> >> >>>
> >> >>>
> >> >>> On Fri, Jul 11, 2014 at 11:07 AM, vlad.gm@gmail.com <
> vlad.gm@gmail.com
> >> >
> >> >>> wrote:
> >> >>>
> >> >>> > This sounds like service discovery for a messaging solution.
At
> this
> >> >>> point
> >> >>> > there would be some overlap with message buses such as Finagle.
> >> Perhaps
> >> >>> > this should be a general framework for discovering the service
> >> endpoint
> >> >>> > that could leave the actual underlaying messaging stack open
(be
> it
> >> >>> > Finagle, Netty-based or ZeroMQ, for example). My only fear
is
> that if
> >> >>> the
> >> >>> > messaging framework is encapsulated completely into Helix,
it
> would
> >> be
> >> >>> hard
> >> >>> > to compete with tuned messaging bus solutions and cover all
> possible
> >> >>> use
> >> >>> > cases (for example, in my company, we use a large number of
sub
> cases
> >> >>> on
> >> >>> > the synchronous call to asynchronous call spectrum).
> >> >>> >
> >> >>> > Regards,
> >> >>> > Vlad
> >> >>> >
> >> >>> >
> >> >>> > On Fri, Jul 11, 2014 at 10:56 AM, Greg Brandt <
> brandt.greg@gmail.com
> >> >
> >> >>> > wrote:
> >> >>> >
> >> >>> >> (copied from HELIX-470)
> >> >>> >>
> >> >>> >> Helix is missing a high-performance way to exchange messages
> among
> >> >>> >> resource partitions, with a user-friendly API.
> >> >>> >>
> >> >>> >> Currently, the Helix messaging service relies on creating
many
> nodes
> >> >>> in
> >> >>> >> ZooKeeper, which can lead to ZooKeeper outages if messages
are
> sent
> >> >>> too
> >> >>> >> frequently.
> >> >>> >>
> >> >>> >> In order to avoid this, high-performance NIO-based HelixActors
> >> should
> >> >>> be
> >> >>> >> implemented (in rough accordance with the actor model).
> HelixActors
> >> >>> exchange
> >> >>> >> messages asynchronously without waiting for a response,
and are
> >> >>> >> partition/state-addressable.
> >> >>> >>
> >> >>> >> The API would look something like this:
> >> >>> >>
> >> >>> >> public interface HelixActor<T> {
> >> >>> >>     void send(Partition partition, String state, T message);
> >> >>> >>     void register(String resource, HelixActorCallback<T>
> callback);
> >> >>> >> }
> >> >>> >> public interface HelixActorCallback<T> {
> >> >>> >>     void onMessage(Partition partition, State state, T
message);
> >> >>> >> }
> >> >>> >>
> >> >>> >> #send should likely support wildcards for partition number
and
> >> state,
> >> >>> or
> >> >>> >> its method signature might need to be massaged a little
bit for
> more
> >> >>> >> flexibility. But that's the basic idea.
> >> >>> >>
> >> >>> >> Nothing is inferred about the format of the messages -
the only
> >> >>> metadata
> >> >>> >> we need to be able to interpret is (1) partition name
and (2)
> state.
> >> >>> The
> >> >>> >> user provides a codec to encode / decode messages, so
it's nicer
> to
> >> >>> >> implementHelixActor#send and HelixActorCallback#onMessage.
> >> >>> >>
> >> >>> >> public interface HelixActorMessageCodec<T> {
> >> >>> >>     byte[] encode(T message);
> >> >>> >>     T decode(byte[] message);
> >> >>> >> }
> >> >>> >>
> >> >>> >> Actors should support somewhere around 100k to 1M messages
per
> >> second.
> >> >>> >> The Netty framework is a potential implementation candidate,
but
> >> >>> should be
> >> >>> >> thoroughly evaluated w.r.t. performance.
> >> >>> >>
> >> >>> >
> >> >>> >
> >> >>>
> >> >>
> >> >>
> >> >
> >>
>
>
>
>
>

Mime
View raw message