From Daniel Gómez Ferro <>
Subject Re: Using Helix for cluster management of S4
Date Wed, 05 Dec 2012 15:42:51 GMT
Hi Kishore,

I just tried it and I think it is very promising!

I followed the readme and everything worked (apart from the adapter 
which you mentioned). Then I wanted to make a few changes to the 
partitioning but I couldn't get it to work, probably because I don't 
understand Helix. I'm sure I wasn't supposed to do most of the things I 
did, but maybe it's valuable to you! :)

First I added a new node using the helix-admin command and started the 
node. One of the partitions got assigned to the new node (and unassigned 
from the old one) but the new node didn't deploy the application. I 
tried doing a rebalance of myApp but that somehow messed the S4 

If I configure the cluster for 3 nodes and start only 2, it works great, 
even after starting the 3rd one (it rebalances the partitions correctly)

I also tried swaping two instances and that also worked (I think!). The 
only comment is that at first I had no idea how to disable an instance 
(it's done with --enable <cluster> <instance> false)

My guess is that I shouldn't be using directly the helix-admin command. 
Do we need to provide custom "s4 <command>" for rebalancing, bringing 
down nodes, etc?

I think the work is great, sorry I couldn't resist playing with it! It 
would be great to have a short guide on how to do a basic rebalancing 
operation and so on.



On Tue Dec  4 20:00:04 2012, kishore g wrote:
> Daniel, did you get a chance to try it out. I have fixed some issues in the
> emitter code to use the partitioning per stream. Would love to hear your
> comments.
> I will create a branch S4-110 with my changes.
> On Fri, Nov 30, 2012 at 2:06 PM, kishore g <> wrote:
>> Agree.
>> On Fri, Nov 30, 2012 at 10:57 AM, Matthieu Morel <>wrote:
>>> Sorry, I had replied to the user list !
>>> Begin forwarded message:
>>>> From: Matthieu Morel <>
>>>> Subject: Re: Using Helix for cluster management of S4
>>>> Date: November 30, 2012 7:56:39 PM GMT+01:00
>>>> To:
>>>> Regarding redistribution of partitions with stateful PEs, another
>>> simple solution is to activate checkpointing.
>>>> Then when you need to repartition:
>>>> - on the origin node, upon a rebalancing request for partition p, you
>>> simply invalidate PE instances from partition p from the local cache (and
>>> trigger checkpointing if necessary)
>>>> - on the destination node, there is nothing to do, state for PEs
>>> belonging to partition p is lazily reloaded (i.e. when necessary).
>>>> Of course there are other solutions, such as manually snapshotting to a
>>> data store, but that may be more complex.
>>>> Matthieu
>>>> On Nov 30, 2012, at 7:44 PM, kishore g wrote:
>>>>> Matt,
>>>>> Thats right the key idea is to over partition the stream( number of
>>> partitions higher than the number of nodes).
>>>>> Helix supports two modes, when a new node is added it automatically
>>> moves partitions. We call this AUTO_REBALANCE. This is recommended for
>>> stateless tasks. Another mode is SEMI_AUTO where you can change the
>>> topology using admin command. So what  one would do is add a new node and
>>> then rebalance the cluster by invoking an Helix Api, Helix will then
>>> re-distribute partitions and as I said earlier will minimize the movement
>>> and co-ordinate the movement. When I say co-ordinate, it will first ask the
>>> old leader to stop processing the partition. It can then snapshot its
>>> state. Once thats done Helix will ask the new node to host the partition,
>>> where it can load the snapshot. I will add this example to the walk through
>>> instruction.
>>>>>   Daniel,
>>>>> Yes that is possible, I think createTask and deployApp commands
>>> already take an optional parameter to list a subset of nodes but I think I
>>> have only implemented it for deployApp. Adding it for createTask( which
>>> assigns stream processing to s4 nodes) is straight forward.
>>>>> Matthieu mentioned that in the Readme instructions the adapter command
>>> is invoking the old code. I will make that change in some time. If you are
>>> trying this now, then run GenericEventAdapter from eclipse directly. ( The
>>> same options hold good).
>>>>> Yes JIRA is 110. I will add the description. I am pretty sure there
>>> will be issues :-)
>>>>> On Fri, Nov 30, 2012 at 8:38 AM, Daniel Gómez Ferro <
>>>> wrote:
>>>>> I agree with Matthieu, that's a really nice integration!
>>>>> I particularly like having different partition schemes per stream. I
>>> guess it would be easy (or at least possible) to implement some kind of
>>> isolation where only a subset of nodes handles a specific stream, for
>>> example (related to S4-91).
>>>>> It looks really nice, I'm looking forward to trying it. I'll give more
>>> feedback if I run into any issues. I guess the right JIRA for that would be
>>> S4-110, right? (It's missing a description!)
>>>>> Good job!
>>>>> Daniel
>>>>> On Fri Nov 30 16:37:11 2012, Matthieu Morel wrote:
>>>>> Thanks Kishore, that's a very interesting contribution!
>>>>> It's also very appropriate considering that S4 is completely
>>> decentralized and that there is no driving/scheduling entity: the logic is
>>> within the nodes. So it's nice to have a way to easily describe and define
>>> coordinated behaviors, and to easily automate them.
>>>>> About the partitioning, the key here as I understand it, is to have a
>>> number of partitions higher than the number of nodes by default, possibly
>>> several times higher. So a given node is assigned multiple partitions. (In
>>> contrast, until now in S4, nb partitions <= nb nodes, including standby
>>> nodes).
>>>>> In the canonical example that you provide, how do we proceed if we
>>> want to add another s4 node? That's not clear to me, and it would help
>>> understand how partitions are reassigned.
>>>>> Thanks!
>>>>> Matthieu
>>>>> In S4 the number of partition is fixed for all streams and is
>>> dependent on
>>>>> the number of nodes in the cluster.  Adding new nodes to S4 cluster
>>> causes
>>>>> the number of partitions to change. This results in lot of data
>>> movement.
>>>>> For example if there are 4 nodes and you add another node then nearly
>>> all
>>>>> keys will be remapped which result is huge data movement where as
>>> ideally
>>>>> only 20% of the data should move.
>>>>> By using Helix, every stream can be  partitioned differently and
>>>>> independent of the number of nodes. Helix distributes the partitions
>>> evenly
>>>>> among the nodes. When new nodes are added, partitions can be migrated
>>> to
>>>>> new nodes without changing the number of partitions and  minimizes the
>>> data
>>>>> movement.
>>>>> In S4 handles failures by having stand by nodes that are idle most of
>>> the
>>>>> time and become active when a node fails. Even though this works, its
>>> not
>>>>> ideal in terms of efficient hardware usage since the stand by nodes are
>>>>> idle most of the time. This also increases the fail over time since
>>> the PE
>>>>> state has to be transfered to only one node.
>>>>> Helix allows S4 to have Active and Standby nodes at a partition level
>>> so
>>>>> that all nodes can be active but some partitions will be Active and
>>> some in
>>>>> stand by mode. When a node fails, the partitions that were  Active on
>>> that
>>>>> node will be evenly distributed among the remaining nodes. This
>>> provides
>>>>> automatic load balancing and also improves fail over time, since PE
>>> state
>>>>> can be transfered to multiple nodes in parallel.
>>>>> I have a prototype implementation here
>>>>> Instructions to build it and try it out are in the Readme.
>>>>> More info on Helix can be found here,
>>>>> Helix can provide lot of other functionalities like
>>>>>     1. Configure the topology according to use case. For example,
>>> co-locate
>>>>>     the partitions of different streams to allow efficient joins.
>>> Configure the
>>>>>     number of standby for each partition based on the head room
>>> available.
>>>>>     2. When new nodes are added, it can throttle the data movement
>>>>>     3. Comes with large set of admin tools like enable/disable node,
>>>>>     dynamically change the topology etc. Provides a rest interface to
>>> manage
>>>>>     the cluster.
>>>>>     4. Allows one to schedule custom tasks like snapshot the PE's in
>>>>>     partition and restore from the snapshot.
>>>>> Would like to get your feedback.
>>>>> Thanks,
>>>>> Kishore G

