incubator-s4-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matthieu Morel <>
Subject Re: Using Helix for cluster management of S4
Date Fri, 30 Nov 2012 18:56:39 GMT

Regarding redistribution of partitions with stateful PEs, another simple solution is to activate

Then when you need to repartition:
- on the origin node, upon a rebalancing request for partition p, you simply invalidate PE
instances from partition p from the local cache (and trigger checkpointing if necessary)
- on the destination node, there is nothing to do, state for PEs belonging to partition p
is lazily reloaded (i.e. when necessary).

Of course there are other solutions, such as manually snapshotting to a data store, but that
may be more complex.


On Nov 30, 2012, at 7:44 PM, kishore g wrote:

> Matt, 
> Thats right the key idea is to over partition the stream( number of partitions higher
than the number of nodes).
> Helix supports two modes, when a new node is added it automatically moves partitions.
We call this AUTO_REBALANCE. This is recommended for stateless tasks. Another mode is SEMI_AUTO
where you can change the topology using admin command. So what  one would do is add a new
node and then rebalance the cluster by invoking an Helix Api, Helix will then re-distribute
partitions and as I said earlier will minimize the movement and co-ordinate the movement.
When I say co-ordinate, it will first ask the old leader to stop processing the partition.
It can then snapshot its state. Once thats done Helix will ask the new node to host the partition,
where it can load the snapshot. I will add this example to the walk through instruction.
>  Daniel,
> Yes that is possible, I think createTask and deployApp commands already take an optional
parameter to list a subset of nodes but I think I have only implemented it for deployApp.
Adding it for createTask( which assigns stream processing to s4 nodes) is straight forward.

> Matthieu mentioned that in the Readme instructions the adapter command is invoking the
old code. I will make that change in some time. If you are trying this now, then run GenericEventAdapter
from eclipse directly. ( The same options hold good).
> Yes JIRA is 110. I will add the description. I am pretty sure there will be issues :-)
> On Fri, Nov 30, 2012 at 8:38 AM, Daniel Gómez Ferro <> wrote:
> I agree with Matthieu, that's a really nice integration!
> I particularly like having different partition schemes per stream. I guess it would be
easy (or at least possible) to implement some kind of isolation where only a subset of nodes
handles a specific stream, for example (related to S4-91).
> It looks really nice, I'm looking forward to trying it. I'll give more feedback if I
run into any issues. I guess the right JIRA for that would be S4-110, right? (It's missing
a description!)
> Good job!
> Daniel
> On Fri Nov 30 16:37:11 2012, Matthieu Morel wrote:
> Thanks Kishore, that's a very interesting contribution!
> It's also very appropriate considering that S4 is completely decentralized and that there
is no driving/scheduling entity: the logic is within the nodes. So it's nice to have a way
to easily describe and define coordinated behaviors, and to easily automate them.
> About the partitioning, the key here as I understand it, is to have a number of partitions
higher than the number of nodes by default, possibly several times higher. So a given node
is assigned multiple partitions. (In contrast, until now in S4, nb partitions <= nb nodes,
including standby nodes).
> In the canonical example that you provide, how do we proceed if we want to add another
s4 node? That's not clear to me, and it would help understand how partitions are reassigned.
> Thanks!
> Matthieu
> In S4 the number of partition is fixed for all streams and is dependent on
> the number of nodes in the cluster.  Adding new nodes to S4 cluster causes
> the number of partitions to change. This results in lot of data movement.
> For example if there are 4 nodes and you add another node then nearly all
> keys will be remapped which result is huge data movement where as ideally
> only 20% of the data should move.
> By using Helix, every stream can be  partitioned differently and
> independent of the number of nodes. Helix distributes the partitions evenly
> among the nodes. When new nodes are added, partitions can be migrated to
> new nodes without changing the number of partitions and  minimizes the data
> movement.
> In S4 handles failures by having stand by nodes that are idle most of the
> time and become active when a node fails. Even though this works, its not
> ideal in terms of efficient hardware usage since the stand by nodes are
> idle most of the time. This also increases the fail over time since the PE
> state has to be transfered to only one node.
> Helix allows S4 to have Active and Standby nodes at a partition level so
> that all nodes can be active but some partitions will be Active and some in
> stand by mode. When a node fails, the partitions that were  Active on that
> node will be evenly distributed among the remaining nodes. This provides
> automatic load balancing and also improves fail over time, since PE state
> can be transfered to multiple nodes in parallel.
> I have a prototype implementation here
> Instructions to build it and try it out are in the Readme.
> More info on Helix can be found here,
> Helix can provide lot of other functionalities like
>    1. Configure the topology according to use case. For example, co-locate
>    the partitions of different streams to allow efficient joins. Configure the
>    number of standby for each partition based on the head room available.
>    2. When new nodes are added, it can throttle the data movement
>    3. Comes with large set of admin tools like enable/disable node,
>    dynamically change the topology etc. Provides a rest interface to manage
>    the cluster.
>    4. Allows one to schedule custom tasks like snapshot the PE's in a
>    partition and restore from the snapshot.
> Would like to get your feedback.
> Thanks,
> Kishore G

View raw message