Hi Kishore,
I tried the new changes and they worked great!
I wanted to make a review of the changes, but it would be much easier
using reviews.apache.org, could you upload the diff there?
I also noticed there are dependencies on Helix from s4-core and s4-comm,
how hard would it be to extract the Helix-specific bits to a separate
module, similar to the YARN integration?
Thanks a lot for your work, it's a very nice integration!
Regards,
Daniel
On 1/3/13 9:06 AM, kishore g wrote:
> Hi Guys,
>
> Happy new year!. I pushed the changes to S4-110 branch. I have added
> the instruction to test addition of nodes. I added the support to tag
> nodes with a group name. This allows one to specify the node group name
> at task creatiion/app deployment.
> Does it makes sense ?
>
> thanks,
> Kishore G
>
>
> On Wed, Dec 5, 2012 at 10:20 AM, kishore g <g.kishore@gmail.com
> <mailto:g.kishore@gmail.com>> wrote:
>
> You were brave enough to try them. You did the right thing for
> adding nodes, disabling node and swapping nodes. For rebalancing
> App, we need to change the DeployApp code to add the new node. The
> reason rebalance did not work for App is its set as CUSTOM
> assignment, which means S4 owns the mapping of Apps to Nodes. One
> way to work around this is to run DeployApp again and it will deploy
> it to new nodes.
>
> Using Helix command directly will work but its better to provide s4
> specific commands for the commands we intend to support.
>
> Thanks again for trying it out.
>
> thanks,
> Kishore G
>
>
> On Wed, Dec 5, 2012 at 7:42 AM, Daniel Gómez Ferro
> <danielgf@yahoo-inc.com <mailto:danielgf@yahoo-inc.com>> wrote:
>
> Hi Kishore,
>
> I just tried it and I think it is very promising!
>
> I followed the readme and everything worked (apart from the
> adapter which you mentioned). Then I wanted to make a few
> changes to the partitioning but I couldn't get it to work,
> probably because I don't understand Helix. I'm sure I wasn't
> supposed to do most of the things I did, but maybe it's valuable
> to you! :)
>
> First I added a new node using the helix-admin command and
> started the node. One of the partitions got assigned to the new
> node (and unassigned from the old one) but the new node didn't
> deploy the application. I tried doing a rebalance of myApp but
> that somehow messed the S4 configuration.
>
> If I configure the cluster for 3 nodes and start only 2, it
> works great, even after starting the 3rd one (it rebalances the
> partitions correctly)
>
> I also tried swaping two instances and that also worked (I
> think!). The only comment is that at first I had no idea how to
> disable an instance (it's done with --enable <cluster>
> <instance> false)
>
> My guess is that I shouldn't be using directly the helix-admin
> command. Do we need to provide custom "s4 <command>" for
> rebalancing, bringing down nodes, etc?
>
> I think the work is great, sorry I couldn't resist playing with
> it! It would be great to have a short guide on how to do a basic
> rebalancing operation and so on.
>
> Regards,
>
> Daniel
>
>
> On Tue Dec 4 20:00:04 2012, kishore g wrote:
>
> Daniel, did you get a chance to try it out. I have fixed
> some issues in the
> emitter code to use the partitioning per stream. Would love
> to hear your
> comments.
>
> I will create a branch S4-110 with my changes.
>
>
> On Fri, Nov 30, 2012 at 2:06 PM, kishore g
> <g.kishore@gmail.com <mailto:g.kishore@gmail.com>> wrote:
>
> Agree.
>
>
> On Fri, Nov 30, 2012 at 10:57 AM, Matthieu Morel
> <mmorel@apache.org <mailto:mmorel@apache.org>>wrote:
>
> Sorry, I had replied to the user list !
>
> Begin forwarded message:
>
> From: Matthieu Morel <mmorel@apache.org
> <mailto:mmorel@apache.org>>
> Subject: Re: Using Helix for cluster management
> of S4
> Date: November 30, 2012 7:56:39 PM GMT+01:00
> To: s4-user@incubator.apache.org
> <mailto:s4-user@incubator.apache.org>
>
>
> Regarding redistribution of partitions with
> stateful PEs, another
>
> simple solution is to activate checkpointing.
>
>
> Then when you need to repartition:
> - on the origin node, upon a rebalancing request
> for partition p, you
>
> simply invalidate PE instances from partition p from
> the local cache (and
> trigger checkpointing if necessary)
>
> - on the destination node, there is nothing to
> do, state for PEs
>
> belonging to partition p is lazily reloaded (i.e.
> when necessary).
>
>
> Of course there are other solutions, such as
> manually snapshotting to a
>
> data store, but that may be more complex.
>
>
> Matthieu
>
> On Nov 30, 2012, at 7:44 PM, kishore g wrote:
>
>
> Matt,
>
> Thats right the key idea is to over
> partition the stream( number of
>
> partitions higher than the number of nodes).
>
>
> Helix supports two modes, when a new node is
> added it automatically
>
> moves partitions. We call this AUTO_REBALANCE. This
> is recommended for
> stateless tasks. Another mode is SEMI_AUTO where you
> can change the
> topology using admin command. So what one would do
> is add a new node and
> then rebalance the cluster by invoking an Helix Api,
> Helix will then
> re-distribute partitions and as I said earlier will
> minimize the movement
> and co-ordinate the movement. When I say
> co-ordinate, it will first ask the
> old leader to stop processing the partition. It can
> then snapshot its
> state. Once thats done Helix will ask the new node
> to host the partition,
> where it can load the snapshot. I will add this
> example to the walk through
> instruction.
>
>
> Daniel,
>
> Yes that is possible, I think createTask and
> deployApp commands
>
> already take an optional parameter to list a subset
> of nodes but I think I
> have only implemented it for deployApp. Adding it
> for createTask( which
> assigns stream processing to s4 nodes) is straight
> forward.
>
>
> Matthieu mentioned that in the Readme
> instructions the adapter command
>
> is invoking the old code. I will make that change in
> some time. If you are
> trying this now, then run GenericEventAdapter from
> eclipse directly. ( The
> same options hold good).
>
>
> Yes JIRA is 110. I will add the description.
> I am pretty sure there
>
> will be issues :-)
>
>
>
>
> On Fri, Nov 30, 2012 at 8:38 AM, Daniel
> Gómez Ferro <
>
> danielgf@yahoo-inc.com
> <mailto:danielgf@yahoo-inc.com>> wrote:
>
> I agree with Matthieu, that's a really nice
> integration!
>
> I particularly like having different
> partition schemes per stream. I
>
> guess it would be easy (or at least possible) to
> implement some kind of
> isolation where only a subset of nodes handles a
> specific stream, for
> example (related to S4-91).
>
>
> It looks really nice, I'm looking forward to
> trying it. I'll give more
>
> feedback if I run into any issues. I guess the right
> JIRA for that would be
> S4-110, right? (It's missing a description!)
>
>
> Good job!
>
> Daniel
>
>
> On Fri Nov 30 16:37:11 2012, Matthieu Morel
> wrote:
> Thanks Kishore, that's a very interesting
> contribution!
>
> It's also very appropriate considering that
> S4 is completely
>
> decentralized and that there is no
> driving/scheduling entity: the logic is
> within the nodes. So it's nice to have a way to
> easily describe and define
> coordinated behaviors, and to easily automate them.
>
>
> About the partitioning, the key here as I
> understand it, is to have a
>
> number of partitions higher than the number of nodes
> by default, possibly
> several times higher. So a given node is assigned
> multiple partitions. (In
> contrast, until now in S4, nb partitions <= nb
> nodes, including standby
> nodes).
>
>
> In the canonical example that you provide,
> how do we proceed if we
>
> want to add another s4 node? That's not clear to me,
> and it would help
> understand how partitions are reassigned.
>
>
> Thanks!
>
> Matthieu
>
>
>
>
> In S4 the number of partition is fixed for
> all streams and is
>
> dependent on
>
> the number of nodes in the cluster. Adding
> new nodes to S4 cluster
>
> causes
>
> the number of partitions to change. This
> results in lot of data
>
> movement.
>
> For example if there are 4 nodes and you add
> another node then nearly
>
> all
>
> keys will be remapped which result is huge
> data movement where as
>
> ideally
>
> only 20% of the data should move.
>
> By using Helix, every stream can be
> partitioned differently and
> independent of the number of nodes. Helix
> distributes the partitions
>
> evenly
>
> among the nodes. When new nodes are added,
> partitions can be migrated
>
> to
>
> new nodes without changing the number of
> partitions and minimizes the
>
> data
>
> movement.
>
> In S4 handles failures by having stand by
> nodes that are idle most of
>
> the
>
> time and become active when a node fails.
> Even though this works, its
>
> not
>
> ideal in terms of efficient hardware usage
> since the stand by nodes are
> idle most of the time. This also increases
> the fail over time since
>
> the PE
>
> state has to be transfered to only one node.
>
> Helix allows S4 to have Active and Standby
> nodes at a partition level
>
> so
>
> that all nodes can be active but some
> partitions will be Active and
>
> some in
>
> stand by mode. When a node fails, the
> partitions that were Active on
>
> that
>
> node will be evenly distributed among the
> remaining nodes. This
>
> provides
>
> automatic load balancing and also improves
> fail over time, since PE
>
> state
>
> can be transfered to multiple nodes in parallel.
>
> I have a prototype implementation here
> https://github.com/kishoreg/__incubator-s4
> <https://github.com/kishoreg/incubator-s4>
>
> Instructions to build it and try it out are
> in the Readme.
>
> More info on Helix can be found here,
>
> http://helix.incubator.apache.__org/
> <http://helix.incubator.apache.org/>
>
>
> Helix can provide lot of other
> functionalities like
>
> 1. Configure the topology according to
> use case. For example,
>
> co-locate
>
> the partitions of different streams to
> allow efficient joins.
>
> Configure the
>
> number of standby for each partition
> based on the head room
>
> available.
>
> 2. When new nodes are added, it can
> throttle the data movement
> 3. Comes with large set of admin tools
> like enable/disable node,
> dynamically change the topology etc.
> Provides a rest interface to
>
> manage
>
> the cluster.
> 4. Allows one to schedule custom tasks
> like snapshot the PE's in a
> partition and restore from the snapshot.
>
>
> Would like to get your feedback.
>
> Thanks,
> Kishore G
>
>
>
>
>
>
>
>
|