helix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kishore g <g.kish...@gmail.com>
Subject Re: Add performant IPC (Helix actors)
Date Fri, 22 Aug 2014 20:42:31 GMT
Thanks Greg for explanation. I agree Akka is 2 or 3 layers above what
Helix-ipc is trying to provide. Also there are some nuances in order
guarantees with Akka. For control messages where reliability is a must and
probably more important than latency, I think using ZK as the communication
channel is the right solution. Without pushing state transition message
through a consensus system like ZK, there will be lot of edge cases where
ensuring correctness becomes impossible.









On Fri, Aug 22, 2014 at 1:29 PM, Greg Brandt <brandt.greg@gmail.com> wrote:

> Hey Henry,
>
> So Akka actors actually give exactly the same messaging guarantees as the
> current implementation of Helix IPC: at-most-once message delivery, and
> ordering per sender-receiver pair. In Helix's case, the latter can be
> thought of further as ordering per-partition or resource, but basically is
> guaranteed by Netty's Channel implementation for sender-receiver pair (
>
> http://stackoverflow.com/questions/9082101/do-messages-sent-via-nettys-channel-write-preserve-their-order-when-begin-sen
> ).
>
> What Akka provides that this IPC layer doesn't is the application life
> cycle management part. This is why I made the claim that Akka Actors are
> analogous to Helix Participants. One writes callbacks for different message
> types in the Actor API to manage various control messages in the same way
> one writes callbacks in Helix to manage application state transitions.
>
> But more importantly, using Akka instead of getting really down into the
> transport layer might hide too much when we want to use it for
> performance-critical things like data replication. Continuing with that
> example, we likely want to use direct IO and minimize data copies in the
> transport layer, and it seems like some of the niceness involved in Akka's
> API would get in the way of that.
>
> Something that may be really cool to do is write another Akka transport
> implementation on top of this, so you could basically add partition/state
> to the Akka API and not have to implement the state machines yourself (as
> is shown in some Akka docs). In that scenario, users should probably be
> unaware that they were using Helix and just use the same Akka API.
>
> -Greg
>
>
> On Fri, Aug 22, 2014 at 11:13 AM, Henry Saputra <henry.saputra@gmail.com>
> wrote:
>
> > Hi Greg,
> >
> > Thanks for the reply. Sorry for late for the discussions.
> >
> > If the IPC semantic will be used only by Helix internally I agree that
> > lower level transport like Netty should be the way to go.
> >
> > For control plane layer, such as communications between controller and
> > participants, wouldn't Akka actors give benefits of already reliable
> > message passing and monitoring without building from ground up?
> > Akka actor system has many different routers and allow callback to
> > caller to get async response (in this case return back response to
> > controller).
> >
> > I just saw the similarity of what the new API looks like with what
> > Akka actor APIs.
> >
> > Thanks,
> >
> > - Henry
> >
> > On Wed, Aug 20, 2014 at 5:46 PM, Greg Brandt <brandt.greg@gmail.com>
> > wrote:
> > > Hey Henry,
> > >
> > > Initially, I thought the same thing. But after evaluating Akka Actors,
> I
> > > think they're a fundamentally different approach to developing
> > distributed
> > > systems as opposed to simply a messaging layer implementation. Let me
> > > explain...
> > >
> > > The primary difference between Akka Actors and Helix is that the former
> > > prefers loose, decentralized control, whereas the latter prefers
> strong,
> > > centralized control. Akka applications are expected to manage state by
> > > themselves, whereas in Helix, that responsibility is delegated to the
> > > controller. And the actual user Akka Actor implementations are
> analogous
> > to
> > > Helix Participant implementations.
> > >
> > > So with that perspective, it would be a bit of a kludge to leverage
> Akka
> > > Actors for simple cluster-scope-addressable message passing. Actually,
> > > Helix already provides this messaging feature, but it is done via
> > ZooKeeper
> > > which is dangerous and less-than-performant for a high volume of
> > messages.
> > >
> > > The right thing to do seems to be to provide a set of better transport
> > > implemenations (e.g. Netty, ActiveMQ, Kafka, etc.) for the
> > > cluster-scope-addressable messaging system, and better preserve the
> > > fundamental concepts in Helix.
> > >
> > > Let me know what you think.
> > >
> > > -Greg
> > >
> > >
> > >
> > > On Wed, Aug 20, 2014 at 9:55 AM, Henry Saputra <
> henry.saputra@gmail.com>
> > > wrote:
> > >
> > >> This seems fitting for Akka actor system [1]. Maybe we could just use
> > >> it as base?
> > >>
> > >>
> > >> - Henry
> > >>
> > >> [1] http://akka.io
> > >>
> > >> On Mon, Jul 21, 2014 at 9:47 AM, Greg Brandt <brandt.greg@gmail.com>
> > >> wrote:
> > >> > We talked about this in a little bit greater detail, and it's
> seeming
> > >> like
> > >> > there are three basic components that we want:
> > >> >
> > >> > 1. Transport - physically moving messages
> > >> > 2. Resolution - map cluster scope to set of machines
> > >> > 3. Management - message state, errors, retries, etc.
> > >> >
> > >> > The transport interface should be generic enough to allow many
> > >> > implementations (e.g. Netty, Kafka, etc.) It may be valuable to do
> > >> multiple
> > >> > implementations if one underlying system supports desired message
> > >> delivery
> > >> > guarantees better than another. For example, Kafka would be more
> > amenable
> > >> > to at-least-once delivery, whereas something like Netty, with much
> > less
> > >> > overhead than Kafka, would be better for at-most-once delivery.
> > >> >
> > >> > The goal here I think is to provide something that's most similar
to
> > Akka
> > >> > Actors, with the following guarantees:
> > >> >
> > >> > 1. at-most-once delivery
> > >> > 2. message ordering per sender-receiver pair
> > >> >
> > >> > A light-weight messaging primitive with those guarantees would allow
> > >> > implementation of many interesting things like Kishore mentions.
> Netty
> > >> > seems most suited to this task (it is the underlying transport for
> > Akka
> > >> as
> > >> > well). The reason to avoid Akka I think is that it imposes too much
> of
> > >> its
> > >> > own philosophy on the user, and is really bloated. A Netty-based
> > >> primitive
> > >> > could be provided out-of-the-box, and the interfaces it implements
> > should
> > >> > be easy enough for users to implement on their own. It would be
> > >> relatively
> > >> > trivial to add a Kafka-based implementation out-of-the-box as well.
> > >> >
> > >> > (Have a bit of a prototype here:
> > >> >
> > >>
> >
> https://github.com/brandtg/helix-actors/blob/master/helix-actors/src/main/java/org/apache/helix/actor/netty/NettyHelixActor.java
> > >> > )
> > >> >
> > >> > The transport mechanism should be agnostic to anything going on in
> the
> > >> > cluster: the resolution module is totally separate, and maps cluster
> > >> scope
> > >> > to sets of nodes. For example, one could send a message to the
> entire
> > >> > cluster, specific partitions of a resource, or even from one cluster
> > to
> > >> > another (via composing resolution modules or something).
> > >> >
> > >> > Then there would be a management component, composed with the other
> > two
> > >> > components, which manages user callbacks associated with varying
> > cluster
> > >> > scopes, tracks messages based on ID in order to perform request /
> > >> response
> > >> > or feedback-loop kinds of things, manages errors / timeouts /
> retries,
> > >> etc.
> > >> >
> > >> > -Greg
> > >> >
> > >> >
> > >> >
> > >> > On Sat, Jul 12, 2014 at 11:27 AM, kishore g <g.kishore@gmail.com>
> > wrote:
> > >> >
> > >> >> Hi Vlad,
> > >> >>
> > >> >> Yes the idea is to have a pluggable architecture where Helix
> provides
> > >> the
> > >> >> messaging channel without serializing/deserializing the data.
In
> the
> > >> end, I
> > >> >> envision having most of the building blocks needed to build a
> > >> distributed
> > >> >> system. One should be able to build a datastore/search/olap/pu
> > >> sub/stream
> > >> >> processing systems easily on top of Helix.
> > >> >>
> > >> >> I am curious to see what kind of tuning you guys did to handle
1.5
> > QPS.
> > >> Our
> > >> >> goal is to have 1M QPS between the two servers. In order to achieve
> > >> this we
> > >> >> might need pipelining/batching along with ordering guarantee's.
> This
> > is
> > >> non
> > >> >> trivial and error prone, I see a huge value in providing this
> > feature.
> > >> Most
> > >> >> systems have built some form replication schemes on their own.
> There
> > are
> > >> >> two parts in the replication, one is the high through put message
> > >> exchange
> > >> >> and the other is the scheme that is synchronous replication/async
> > >> >> replication/chained replication/consensus etc. The first part
is
> the
> > >> common
> > >> >> part that is needed across all the replication scheme.
> > >> >>
> > >> >> Of course getting this right is not easy and design is tricky
but I
> > feel
> > >> >> its worth a try.
> > >> >>
> > >> >> This is the high level flow I had discussed with Greg.
> > >> >>
> > >> >> Each server can host P partitions and is listening at one port.
> > Based on
> > >> >> how many execution threads are needed, we can create C channels.
> > Every
> > >> >> partition maps to a specific channel. On each channel we can
> > guarantee
> > >> >> message ordering. We simply provide a callback to handle each
> message
> > >> and
> > >> >> pass the received bytes without deserializing. Its up to the
> > callback to
> > >> >> either process it synchronously or asynchronously.
> > >> >>
> > >> >> There are more details that its difficult to go over in the email.
> We
> > >> >> should probably write up the design and then deep dive on the
> > details.
> > >> >>
> > >> >> thanks,
> > >> >> Kishore G
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >> On Sat, Jul 12, 2014 at 8:08 AM, vlad.gm@gmail.com <
> > vlad.gm@gmail.com>
> > >> >> wrote:
> > >> >>
> > >> >> > I agree that having a messaging scheme that is not
> > ZooKeeper-reliant
> > >> >> would
> > >> >> > help in building some synchronization protocols between clients
> and
> > >> >> > replicas. I am thinking of cases such as writing inside Kafka
to
> > >> >> replicated
> > >> >> > partitions our our own application,a replicated Key/Value
store.
> > >> >> >
> > >> >> > For me, service discovery bridges the gap between an application
> > >> wanting
> > >> >> > to address a service (in Helix's case more particularly,
a
> > >> >> service/resource
> > >> >> > name/partition tuple) and finding out a list of hostnames
that
> can
> > >> >> receive
> > >> >> > that request. After that, whether the application wants to
use
> > Helix
> > >> >> itself
> > >> >> > or another messaging stack is an open problem, perhaps a
clear
> > >> separation
> > >> >> > of the discovery/messaging layers would allow for quickly
> swapping
> > >> >> > different pieces here.
> > >> >> >
> > >> >> > The interesting part (in my view) in providing fast messaging
as
> > part
> > >> of
> > >> >> > Helix is the ability to use the service topology information
that
> > >> Helix
> > >> >> > hosts (number of replicas and their placement) together with
the
> > >> >> messaging
> > >> >> > layer as sufficient components for realizing a consensus
> algorithm
> > >> >> between
> > >> >> > replicas, that is providing, on top of the regular service
> > discovery
> > >> >> > function of writing to one instance of a service, the function
of
> > >> writing
> > >> >> > with consensus verification to all replicas of a service.
> > >> >> >
> > >> >> > In terms of capabilities, looking at Kafka and KV/Store as
use
> > cases,
> > >> >> this
> > >> >> > would mean running a whole protocol between the replicas
of the
> > >> service
> > >> >> to
> > >> >> > achieve a certain synchronization outcome (be it Paxos, 2
Phase
> > >> Commit or
> > >> >> > Chain Replication - Paxos would probably not be needed since
> Helix
> > >> >> already
> > >> >> > provides a resilient master for the partition, so there is
a
> clear
> > >> >> > hierarchy). Kafka suggests even more cases if we take the
> > interaction
> > >> >> > between client and service into account, namely specifying
the
> > call to
> > >> >> > return after the master gets the data or after consensus
between
> > >> master
> > >> >> and
> > >> >> > replicas have been achieved, or sending data asynchronously
(the
> > >> >> > combination of the producer.type and request.required.acks
> > parameters
> > >> of
> > >> >> > the producer).
> > >> >> >
> > >> >> > The only reason for which I would like to see a pluggable
> > >> architecture is
> > >> >> > that in high volume applications the messaging stack requires
a
> > lot of
> > >> >> > tuning. As I said at the meet-up, in our company we deal
with
> about
> > >> 1.5M
> > >> >> > QPS, so having serialization/deserialization overheads per
> > message, or
> > >> >> > event separate listening/executor threads can be resource
> > intensive.
> > >> >> > Whenever we can, we are tempted to use a stage-event drive
> > >> architecture,
> > >> >> > with few threads that handle large message pools. This also
> raises
> > the
> > >> >> > question of how to implement the upper-layer callbacks (that
> would
> > >> >> perform
> > >> >> > the synchronization-related actions) without spawning new
> threads.
> > >> >> >
> > >> >> > Regards,
> > >> >> > Vlad
> > >> >> >
> > >> >> >
> > >> >> > On Fri, Jul 11, 2014 at 11:40 PM, kishore g <g.kishore@gmail.com
> >
> > >> wrote:
> > >> >> >
> > >> >> >> Good point Vlad, I was thinking of defining the right
> abstraction
> > and
> > >> >> >> possibly provide one implementation based. I think the
> synchronous
> > >> and
> > >> >> >> asynchronous messaging should be covered as part of this.
> > >> >> >>
> > >> >> >> Also I think Finagle and likes of it cater more towards
the
> client
> > >> >> server
> > >> >> >> communication but what we lack today is a good solution
for peer
> > to
> > >> peer
> > >> >> >> transport. For example, if some one has to build a consensus
> layer
> > >> they
> > >> >> >> have build everything from ground up. Providing a base
layer
> that
> > >> >> exchange
> > >> >> >> messages between peers on a per partition basis can be
a great
> > >> building
> > >> >> >> block to build different replication schemes.
> > >> >> >>
> > >> >> >> Overall messaging can be used for two cases
> > >> >> >> -- data and control.
> > >> >> >>
> > >> >> >>
> > >> >> >> CONTROL
> > >> >> >>
> > >> >> >> This is needed for control messages that occur rarely,
for these
> > >> type of
> > >> >> >> messages latency/throughput is not important but its
important
> > for it
> > >> >> to be
> > >> >> >> reliable.
> > >> >> >>
> > >> >> >>
> > >> >> >> DATA
> > >> >> >>
> > >> >> >> This is mainly exchanging data between different roles,
> > >> >> >> controller-participant, participant-participant,
> > >> spectator-participant.
> > >> >> >> These types of message exchanges occur quite frequently
and
> having
> > >> high
> > >> >> >> throughput and low latency is a requirement.
> > >> >> >>
> > >> >> >> For example having the following api and guarantee would
allow
> > one to
> > >> >> >> build synchronous replication, asynchronous, quorum etc.
> > >> >> >>
> > >> >> >> send(partition, message, ACK_MODE, callback) ACK_MODE
can be
> like
> > ACK
> > >> >> >> from ALL, QUORUM, 1, NONE etc. callback is fired when
one gets
> the
> > >> >> response
> > >> >> >> back from the receiver.
> > >> >> >>
> > >> >> >> This simple api can allow both synchronous and asynchronous
mode
> > of
> > >> >> >> communication. We dont have to do the
> > serialization/deserialization.
> > >> The
> > >> >> >> hard part here would be to guarantee ordering between
the
> > messages.
> > >> One
> > >> >> >> should be able to specify the message ordering requirement,
FIFO
> > on a
> > >> >> per
> > >> >> >> partition level or dont care about ordering. Having this
makes
> it
> > >> easy
> > >> >> for
> > >> >> >> one to implement replication schemes.
> > >> >> >>
> > >> >> >>  thanks,
> > >> >> >> Kishore G
> > >> >> >>
> > >> >> >>
> > >> >> >> On Fri, Jul 11, 2014 at 12:49 PM, Greg Brandt <
> > brandt.greg@gmail.com
> > >> >
> > >> >> >> wrote:
> > >> >> >>
> > >> >> >>> Vlad,
> > >> >> >>>
> > >> >> >>> I'm not sure that the goal here is to cover all possible
use
> > cases.
> > >> >> This
> > >> >> >>> is
> > >> >> >>> intended as basically a replacement for the current
Helix
> cluster
> > >> >> >>> messaging
> > >> >> >>> service, which doesn't rely on ZooKeeper. I would
argue that
> > Helix
> > >> is
> > >> >> >>> already that general framework for discovering the
service
> > endpoint,
> > >> >> and
> > >> >> >>> that this is just one implementation of an underlying
messaging
> > >> stack
> > >> >> >>> (for
> > >> >> >>> which there is currently only the ZooKeeper-based
> > implementation).
> > >> >> >>>
> > >> >> >>> Moreover, keeping it too general (i.e. providing
the APIs
> > outlined
> > >> in
> > >> >> the
> > >> >> >>> JIRA, but no implementation) puts much greater onus
on the
> user.
> > It
> > >> >> would
> > >> >> >>> be nice for someone just starting out with Helix
to have a
> pretty
> > >> good
> > >> >> >>> messaging service readily available, as well as the
APIs to
> > >> implement
> > >> >> >>> more
> > >> >> >>> specific solutions if he or she so chooses.
> > >> >> >>>
> > >> >> >>> -Greg
> > >> >> >>>
> > >> >> >>>
> > >> >> >>> On Fri, Jul 11, 2014 at 11:07 AM, vlad.gm@gmail.com
<
> > >> vlad.gm@gmail.com
> > >> >> >
> > >> >> >>> wrote:
> > >> >> >>>
> > >> >> >>> > This sounds like service discovery for a messaging
solution.
> At
> > >> this
> > >> >> >>> point
> > >> >> >>> > there would be some overlap with message buses
such as
> Finagle.
> > >> >> Perhaps
> > >> >> >>> > this should be a general framework for discovering
the
> service
> > >> >> endpoint
> > >> >> >>> > that could leave the actual underlaying messaging
stack open
> > (be
> > >> it
> > >> >> >>> > Finagle, Netty-based or ZeroMQ, for example).
My only fear is
> > >> that if
> > >> >> >>> the
> > >> >> >>> > messaging framework is encapsulated completely
into Helix, it
> > >> would
> > >> >> be
> > >> >> >>> hard
> > >> >> >>> > to compete with tuned messaging bus solutions
and cover all
> > >> possible
> > >> >> >>> use
> > >> >> >>> > cases (for example, in my company, we use a
large number of
> sub
> > >> cases
> > >> >> >>> on
> > >> >> >>> > the synchronous call to asynchronous call spectrum).
> > >> >> >>> >
> > >> >> >>> > Regards,
> > >> >> >>> > Vlad
> > >> >> >>> >
> > >> >> >>> >
> > >> >> >>> > On Fri, Jul 11, 2014 at 10:56 AM, Greg Brandt
<
> > >> brandt.greg@gmail.com
> > >> >> >
> > >> >> >>> > wrote:
> > >> >> >>> >
> > >> >> >>> >> (copied from HELIX-470)
> > >> >> >>> >>
> > >> >> >>> >> Helix is missing a high-performance way
to exchange messages
> > >> among
> > >> >> >>> >> resource partitions, with a user-friendly
API.
> > >> >> >>> >>
> > >> >> >>> >> Currently, the Helix messaging service relies
on creating
> many
> > >> nodes
> > >> >> >>> in
> > >> >> >>> >> ZooKeeper, which can lead to ZooKeeper outages
if messages
> are
> > >> sent
> > >> >> >>> too
> > >> >> >>> >> frequently.
> > >> >> >>> >>
> > >> >> >>> >> In order to avoid this, high-performance
NIO-based
> HelixActors
> > >> >> should
> > >> >> >>> be
> > >> >> >>> >> implemented (in rough accordance with the
actor model).
> > >> HelixActors
> > >> >> >>> exchange
> > >> >> >>> >> messages asynchronously without waiting
for a response, and
> > are
> > >> >> >>> >> partition/state-addressable.
> > >> >> >>> >>
> > >> >> >>> >> The API would look something like this:
> > >> >> >>> >>
> > >> >> >>> >> public interface HelixActor<T> {
> > >> >> >>> >>     void send(Partition partition, String
state, T message);
> > >> >> >>> >>     void register(String resource, HelixActorCallback<T>
> > >> callback);
> > >> >> >>> >> }
> > >> >> >>> >> public interface HelixActorCallback<T>
{
> > >> >> >>> >>     void onMessage(Partition partition,
State state, T
> > message);
> > >> >> >>> >> }
> > >> >> >>> >>
> > >> >> >>> >> #send should likely support wildcards for
partition number
> and
> > >> >> state,
> > >> >> >>> or
> > >> >> >>> >> its method signature might need to be massaged
a little bit
> > for
> > >> more
> > >> >> >>> >> flexibility. But that's the basic idea.
> > >> >> >>> >>
> > >> >> >>> >> Nothing is inferred about the format of
the messages - the
> > only
> > >> >> >>> metadata
> > >> >> >>> >> we need to be able to interpret is (1) partition
name and
> (2)
> > >> state.
> > >> >> >>> The
> > >> >> >>> >> user provides a codec to encode / decode
messages, so it's
> > nicer
> > >> to
> > >> >> >>> >> implementHelixActor#send and HelixActorCallback#onMessage.
> > >> >> >>> >>
> > >> >> >>> >> public interface HelixActorMessageCodec<T>
{
> > >> >> >>> >>     byte[] encode(T message);
> > >> >> >>> >>     T decode(byte[] message);
> > >> >> >>> >> }
> > >> >> >>> >>
> > >> >> >>> >> Actors should support somewhere around 100k
to 1M messages
> per
> > >> >> second.
> > >> >> >>> >> The Netty framework is a potential implementation
candidate,
> > but
> > >> >> >>> should be
> > >> >> >>> >> thoroughly evaluated w.r.t. performance.
> > >> >> >>> >>
> > >> >> >>> >
> > >> >> >>> >
> > >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >> >
> > >> >>
> > >>
> >
>

Mime
View raw message