Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3373D109CC for ; Thu, 9 May 2013 01:28:00 +0000 (UTC) Received: (qmail 99701 invoked by uid 500); 9 May 2013 01:28:00 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 99662 invoked by uid 500); 9 May 2013 01:28:00 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 99647 invoked by uid 99); 9 May 2013 01:28:00 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 May 2013 01:28:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 May 2013 01:27:47 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id ED6EC23888EA; Thu, 9 May 2013 01:27:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1480516 [1/24] - in /incubator/helix/site-content: ./ apidocs/reference/org/apache/helix/ apidocs/reference/org/apache/helix/messaging/handling/ apidocs/reference/org/apache/helix/model/ apidocs/reference/org/apache/helix/participant/state... Date: Thu, 09 May 2013 01:27:18 -0000 To: commits@helix.incubator.apache.org From: kishoreg@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130509012724.ED6EC23888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kishoreg Date: Thu May 9 01:27:17 2013 New Revision: 1480516 URL: http://svn.apache.org/r1480516 Log: Site checkin for project Apache Helix Added: incubator/helix/site-content/ApiUsage.html incubator/helix/site-content/Sample_App.html incubator/helix/site-content/apidocs/reference/org/apache/helix/ConfigScope.ConfigScopeProperty.html incubator/helix/site-content/apidocs/reference/org/apache/helix/ConfigScope.html incubator/helix/site-content/apidocs/reference/org/apache/helix/ConfigScopeBuilder.html incubator/helix/site-content/apidocs/reference/org/apache/helix/messaging/handling/HelixTask.TimeoutCancelTask.html incubator/helix/site-content/apidocs/reference/org/apache/helix/model/ClusterConstraints.ConstraintItem.html incubator/helix/site-content/apidocs/reference/org/apache/helix/model/IdealState.AutoModeBuilder.html incubator/helix/site-content/apidocs/reference/org/apache/helix/model/IdealState.CustomBuilder.html incubator/helix/site-content/apidocs/reference/org/apache/helix/model/IdealState.SemiAutoBuilder.html incubator/helix/site-content/apidocs/reference/org/apache/helix/participant/statemachine/StateTransitionTableBuilder.html incubator/helix/site-content/apidocs/reference/org/apache/helix/tools/IdealStateCalculatorForStorageNode.html incubator/helix/site-content/apidocs/reference/org/apache/helix/tools/PropertiesReader.html incubator/helix/site-content/involved/release.html incubator/helix/site-content/xref-test/org/apache/helix/ExternalCommand.html incubator/helix/site-content/xref-test/org/apache/helix/integration/TestGroupMessage.html incubator/helix/site-content/xref-test/org/apache/helix/integration/TestMessageThrottle2.html incubator/helix/site-content/xref/org/apache/helix/ConfigScope.html incubator/helix/site-content/xref/org/apache/helix/ConfigScopeBuilder.html incubator/helix/site-content/xref/org/apache/helix/participant/statemachine/StateTransitionTableBuilder.html incubator/helix/site-content/xref/org/apache/helix/tools/IdealStateCalculatorForStorageNode.html incubator/helix/site-content/xref/org/apache/helix/tools/PropertiesReader.html Modified: incubator/helix/site-content/checkstyle-aggregate.html incubator/helix/site-content/checkstyle.rss incubator/helix/site-content/distribution-management.html incubator/helix/site-content/helix-admin-webapp/dependencies.html incubator/helix/site-content/helix-admin-webapp/distribution-management.html incubator/helix/site-content/helix-agent/dependencies.html incubator/helix/site-content/helix-agent/distribution-management.html incubator/helix/site-content/helix-core/dependencies.html incubator/helix/site-content/helix-core/distribution-management.html incubator/helix/site-content/index.html incubator/helix/site-content/mockservice/dependencies.html incubator/helix/site-content/mockservice/distribution-management.html incubator/helix/site-content/recipes/distributed-lock-manager/dependencies.html incubator/helix/site-content/recipes/distributed-lock-manager/distribution-management.html incubator/helix/site-content/recipes/distribution-management.html incubator/helix/site-content/recipes/rabbitmq-consumer-group/dependencies.html incubator/helix/site-content/recipes/rabbitmq-consumer-group/distribution-management.html incubator/helix/site-content/recipes/rsync-replicated-file-system/dependencies.html incubator/helix/site-content/recipes/rsync-replicated-file-system/distribution-management.html incubator/helix/site-content/recipes/service-discovery/dependencies.html incubator/helix/site-content/recipes/service-discovery/distribution-management.html incubator/helix/site-content/recipes/task-execution/dependencies.html incubator/helix/site-content/recipes/task-execution/distribution-management.html incubator/helix/site-content/releasing.html incubator/helix/site-content/xref-test/allclasses-frame.html incubator/helix/site-content/xref-test/org/apache/helix/integration/TestAutoRebalance.html incubator/helix/site-content/xref-test/org/apache/helix/integration/TestMessagingService.html incubator/helix/site-content/xref-test/org/apache/helix/integration/package-frame.html incubator/helix/site-content/xref-test/org/apache/helix/integration/package-summary.html incubator/helix/site-content/xref-test/org/apache/helix/messaging/TestAsyncCallback.html incubator/helix/site-content/xref-test/org/apache/helix/messaging/TestAsyncCallbackSvc.html incubator/helix/site-content/xref-test/org/apache/helix/messaging/package-frame.html incubator/helix/site-content/xref-test/org/apache/helix/messaging/package-summary.html incubator/helix/site-content/xref/allclasses-frame.html incubator/helix/site-content/xref/org/apache/helix/NotificationContext.html incubator/helix/site-content/xref/org/apache/helix/PropertyType.html incubator/helix/site-content/xref/org/apache/helix/messaging/handling/HelixTaskExecutor.html incubator/helix/site-content/xref/org/apache/helix/model/AlertStatus.html incubator/helix/site-content/xref/org/apache/helix/model/Alerts.html incubator/helix/site-content/xref/org/apache/helix/model/package-frame.html incubator/helix/site-content/xref/org/apache/helix/model/package-summary.html incubator/helix/site-content/xref/org/apache/helix/package-frame.html incubator/helix/site-content/xref/org/apache/helix/package-summary.html Added: incubator/helix/site-content/ApiUsage.html URL: http://svn.apache.org/viewvc/incubator/helix/site-content/ApiUsage.html?rev=1480516&view=auto ============================================================================== --- incubator/helix/site-content/ApiUsage.html (added) +++ incubator/helix/site-content/ApiUsage.html Thu May 9 01:27:17 2013 @@ -0,0 +1,659 @@ + + + + + + + + + Apache Helix - + + + + + + + + + + + + + + + + + + +
+ + + + + +
+
+ +
+ + +
+ +

Create an instance of Manager

The first step of using the Helix api will be creating a Helix manager instance. It requires the following parameters:

+
    +
  • clusterName: A logical name to represent the group of nodes
  • +
  • instanceName: A logical name of the process creating the manager instance. Generally this is host:port.
  • +
  • instanceType: Type of the process. This can be one of the following types: +
      +
    • CONTROLLER: Process that controls the cluster, any number of controllers can be started but only one will be active at any given time.
    • +
    • PARTICIPANT: Process that performs the actual task in the distributed system.
    • +
    • SPECTATOR: Process that observes the changes in the cluster.
    • +
    • ADMIN: To carry out system admin actions.
    • +
  • +
  • zkConnectString: Connection string to Zookeeper. This is of the form host1:port1,host2:port2,host3:port3.
  • +
+
      manager = HelixManagerFactory.getZKHelixManager(clusterName,
+                                                      instanceName,
+                                                      instanceType,
+                                                      zkConnectString);
+

Setting up a cluster

Initial setup of a cluster, involves creating appropriate znodes in the zookeeper.

+
    //Create setuptool instance
+    ClusterSetupTool setupTool = new ClusterSetupTool(zkConnectString);
+    //Create cluster namespace in zookeeper
+    setupTool.addCluster(clusterName, true);
+    //Add six Participant instances, each instance must have a unique id. host:port is the standard convention
+    String instances[] = new String[6];
+    for (int i = 0; i < storageInstanceInfoArray.length; i++)
+    {
+      instance[i] = "localhost:" + (8900 + i);
+    }
+    setupTool.addInstancesToCluster(clusterName, instances);
+    //add the resource with 10 partitions to the cluster. Using MasterSlave state model. 
+    //See the section on how to configure a application specific state model
+    setupTool.addResourceToCluster(clusterName, "TestDB", 10, "MasterSlave");
+    //This will do the assignment of partitions to instances. Assignment algorithm is based on consistent hashing and RUSH. 
+    //See how to do custom partition assignment
+    setupTool.rebalanceResource(clusterName, "TestDB", 3);
+

Participant

Starting up a participant is pretty straightforward. After the Helix manager instance is created, only thing that needs to be registered is the state model factory. The Methods on the State Model will be called when controller sends transitions to the Participant.

+
      manager = HelixManagerFactory.getZKHelixManager(clusterName,
+                                                          instanceName,
+                                                          InstanceType.PARTICIPANT,
+                                                          zkConnectString);
+     StateMachineEngine stateMach = manager.getStateMachineEngine();
+     //create a stateModelFactory that returns a statemodel object for each partition. 
+     stateModelFactory = new OnlineOfflineStateModelFactory();     
+     stateMach.registerStateModelFactory(stateModelType, stateModelFactory);
+     manager.connect();
+
+
public class OnlineOfflineStateModelFactory extends
+		StateModelFactory<StateModel> {
+	@Override
+	public StateModel createNewStateModel(String stateUnitKey) {
+		OnlineOfflineStateModel stateModel = new OnlineOfflineStateModel();
+		return stateModel;
+	}
+	@StateModelInfo(states = "{'OFFLINE','ONLINE'}", initialState = "OFFINE")
+	public static class OnlineOfflineStateModel extends StateModel {
+        @Transition(from = "OFFLINE", to = "ONLINE")
+		public void onBecomeOnlineFromOffline(Message message,
+				NotificationContext context) {
+			System.out
+					.println("OnlineOfflineStateModel.onBecomeOnlineFromOffline()");
+			//Application logic to handle transition 
+		}
+        @Transition(from = "ONLINE", to = "OFFLINE")
+		public void onBecomeOfflineFromOnline(Message message,
+				NotificationContext context) {
+			System.out
+						.println("OnlineOfflineStateModel.onBecomeOfflineFromOnline()");
+			//Application logic to handle transition
+		}
+	}
+}
+

Controller Code

Controller needs to know about all changes in the cluster. Helix comes with default implementation to handle all changes in the cluster. If you have a need to add additional functionality, see GenericHelixController on how to configure the pipeline.

+
      manager = HelixManagerFactory.getZKHelixManager(clusterName,
+                                                          instanceName,
+                                                          InstanceType.CONTROLLER,
+                                                          zkConnectString);
+     manager.connect();
+     GenericHelixController controller = new GenericHelixController();
+     manager.addConfigChangeListener(controller);
+     manager.addLiveInstanceChangeListener(controller);
+     manager.addIdealStateChangeListener(controller);
+     manager.addExternalViewChangeListener(controller);
+     manager.addControllerListener(controller);
+

This above snippet shows how the controller is started. You can also start the controller using command line interface.

+
cd helix
+mvn clean install -Dmaven.test.skip=true
+cd helix-core/target/helix-core-pkg/bin
+chmod +x *
+./run-helix-controller.sh --zkSvr <ZookeeperServerAddress(Required)>  --cluster <Cluster name (Required)>
+

Spectator Code

A spectator simply observes all cluster is notified when the state of the system changes. Helix consolidates the state of entire cluster in one Znode called ExternalView. Helix provides a default implementation RoutingTableProvider that caches the cluster state and updates it when there is a change in the cluster

+
manager = HelixManagerFactory.getZKHelixManager(clusterName,
+                                                          instanceName,
+                                                          InstanceType.PARTICIPANT,
+                                                          zkConnectString);
+manager.connect();
+RoutingTableProvider routingTableProvider = new RoutingTableProvider();
+manager.addExternalViewChangeListener(routingTableProvider);
+
+

In order to figure out who is serving a partition, here are the apis

+
instances = routingTableProvider.getInstances("DBNAME", "PARITION_NAME", "PARTITION_STATE");
+

Helix Admin operations

Helix provides multiple ways to administer the cluster. It has a command line interface and also a REST interface.

+
cd helix
+mvn clean install -Dmaven.test.skip=true
+cd helix-core/target/helix-core-pkg/bin
+chmod +x *
+./helix-admin.sh --help
+Provide zookeeper address. Required for all commands  
+   --zkSvr <ZookeeperServerAddress(Required)>       
+
+Add a new cluster                                                          
+   --addCluster <clusterName>                              
+
+Add a new Instance to a cluster                                    
+   --addNode <clusterName InstanceAddress(host:port)>                                      
+
+Add a State model to a cluster                                     
+   --addStateModelDef <clusterName <filename>>    
+
+Add a resource to a cluster            
+   --addResource <clusterName resourceName partitionNum stateModelRef <mode(AUTO_REBALANCE|AUTO|CUSTOM)>>      
+
+Upload an IdealState(Partition to Node Mapping)                                         
+   --addIdealState <clusterName resourceName <filename>>            
+
+Delete a cluster
+   --dropCluster <clusterName>                                                                         
+
+Delete a resource
+   --dropResource <clusterName resourceName>                                                           Drop an existing resource from a cluster
+
+Drop an existing Instance from a cluster    
+   --dropNode <clusterName InstanceAddress(host:port)>                    
+
+Enable/disable the entire cluster, this will basically pause the controller which means no transitions will be trigger, but the existing node sin the cluster continue to function 
+   --enableCluster <clusterName>
+
+Enable/disable a Instance. Useful to take a faulty node out of the cluster.
+   --enableInstance <clusterName InstanceName true/false>
+
+Enable/disable a partition
+   --enablePartition <clusterName instanceName resourceName partitionName true/false>
+
+
+   --listClusterInfo <clusterName>                                                                     Query info of a cluster
+   --listClusters                                                                                      List existing clusters
+   --listInstanceInfo <clusterName InstanceName>                                                       Query info of a Instance in a cluster
+   --listInstances <clusterName>                                                                       List Instances in a cluster
+   --listPartitionInfo <clusterName resourceName partitionName>                                        Query info of a partition
+   --listResourceInfo <clusterName resourceName>                                                       Query info of a resource
+   --listResources <clusterName>                                                                       List resources hosted in a cluster
+   --listStateModel <clusterName stateModelName>                                                       Query info of a state model in a cluster
+   --listStateModels <clusterName>                                                                     Query info of state models in a cluster
+
+

Idealstate modes and configuration

+
    +
  • AUTO mode: Partition to Node assignment is pre-generated using consistent hashing
  • +
+
  setupTool.addResourceToCluster(clusterName, resourceName, partitionNumber, "MasterSlave")
+  setupTool.rebalanceStorageCluster(clusterName, resourceName, replicas)
+
+
    +
  • AUTO_REBALANCE mode: Partition to Node assignment is generated dynamically by cluster manager based on the nodes that are currently up and running
  • +
+
 setupTool.addResourceToCluster(clusterName, resourceName, partitionNumber, "MasterSlave", "AUTO_REBALANCE")
+ setupTool.rebalanceStorageCluster(clusterName, resourceName, replicas)
+
+
    +
  • CUSTOMIZED mode: Allows one to set the is pre-generated from a JSON format file
  • +

+ setupTool.addIdealState(clusterName, resourceName, idealStateJsonFile) +

Configuring state model

+
StateModelConfigGenerator generator = new StateModelConfigGenerator();
+ZnRecord stateModelConfig = generator.generateConfigForOnlineOffline();
+StateModelDefinition stateModelDef = new StateModelDefinition(stateModelConfig);
+ClusterSetup setupTool = new ClusterSetup(zkConnectString);
+setupTool.addStateModelDef(cluster,stateModelName,stateModelDef);
+

See StateModelConfigGenerator to get more info on creating custom state model.

Messaging Api usage

See BootstrapProcess.java in examples package to see how Participants can exchange messages with each other.

+
      ClusterMessagingService messagingService = manager.getMessagingService();
+      //CONSTRUCT THE MESSAGE
+      Message requestBackupUriRequest = new Message(
+          MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+      requestBackupUriRequest
+          .setMsgSubType(BootstrapProcess.REQUEST_BOOTSTRAP_URL);
+      requestBackupUriRequest.setMsgState(MessageState.NEW);
+      //SET THE RECIPIENT CRITERIA, All nodes that satisfy the criteria will receive the message
+      Criteria recipientCriteria = new Criteria();
+      recipientCriteria.setInstanceName("*");
+      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+      recipientCriteria.setResource("MyDB");
+      recipientCriteria.setPartition("");
+      //Should be processed only the process that is active at the time of sending the message. 
+      //This means if the recipient is restarted after message is sent, it will not be processed.
+      recipientCriteria.setSessionSpecific(true);
+      // wait for 30 seconds
+      int timeout = 30000;
+      //The handler that will be invoked when any recipient responds to the message.
+      BootstrapReplyHandler responseHandler = new BootstrapReplyHandler();
+      //This will return only after all recipients respond or after timeout.
+      int sentMessageCount = messagingService.sendAndWait(recipientCriteria,
+          requestBackupUriRequest, responseHandler, timeout);
+

For more details on MessagingService see ClusterMessagingService

+
+
+
+ +
+ + + + \ No newline at end of file Added: incubator/helix/site-content/Sample_App.html URL: http://svn.apache.org/viewvc/incubator/helix/site-content/Sample_App.html?rev=1480516&view=auto ============================================================================== --- incubator/helix/site-content/Sample_App.html (added) +++ incubator/helix/site-content/Sample_App.html Thu May 9 01:27:17 2013 @@ -0,0 +1,369 @@ + + + + + + + + + Apache Helix - + + + + + + + + + + + + + + + + + +
+ + + + + + +
+ +

RabbitMQ Consumer Group

RabbitMQ is a well known Open source software the provides robust messaging for applications.

One of the commonly implemented recipes using this software is a work queue. http://www.rabbitmq.com/tutorials/tutorial-four-java.html describes the use case where

+
    +
  • A producer sends a message with a routing key.
  • +
  • The message goes to the queues whose binding key exactly matches the routing key of the message.
  • +
  • There are multiple consumers and each consumer is interested in processing only a subset of the messages by binding to the interested keys
  • +

The example provided here describes how multiple consumers can be started to process all the tasks.

While this works, in production systems one needs the following * Ability to handle failures: when a consumers fails another consumer must be started or the other consumers must start processing these messages that should have been processed by the failed consumer. * When the existing consumers cannot keep up with the task generation rate, new consumers will be added. The tasks must be redistributed among all the consumers.

In this sample app, we explain how these set of consumers can be grouped together and handle consumer failures and expansion automatically.

Mapping this usecase to Helix is pretty easy as the binding key/routing key is equivalent to a partition.

Lets take a real example. Lets say a topic has 6 partitions, and we have 2 consumers to process all the queues. What we want is all 6 queues to be evenly divided among 2 consumers. Eventually when the system scales, we add more consumers to keep up. This will make each consumer process tasks from 2 queues. Now lets say that a consumer fails and that the number of active consumers is now reduced to 2. This means each consumer must process 3 queues.

We showcase how such a dynamic App can be developed using Helix.

Try it

Before getting into the details on how to develop such an App using Helix, you can try the following steps to get a feel of it.

+
git clone git@github.com:linkedin/helix.git
+cd helix
+./build
+export HELIX_PKG_ROOT=`pwd`/helix-core/target/helix-core-pkg
+

OR Download the latest 0.5.28 release tar ball from here +tar -xzvf helix-core-pkg-0.5.28.tar.gz +export HELIX_PKG_ROOT=`pwd`/helix-core-pkg +

Download the rabbitmq-consumer-group recipe from here +tar -xzvf rabbitmq-consumer-group-0.5.28.tar.gz +export HELIX_RABBITMQ_ROOT=`pwd`/rabbitmq-consumer-group/ + +chmod +x $HELIX_PKG_ROOT/bin/* +chmod +x $HELIX_RABBITMQ_ROOT/bin/* +

Install Rabbit MQ

Setting up RabbitMQ on a local box is straightforward. You can find the instructions here http://www.rabbitmq.com/download.html

Start ZK

Start zookeeper at port 2199 +$HELIX_PKG_ROOT/bin/start-standalone-zookeeper 2199 +

Setup the consumer group cluster

This will setup the cluster by creating a rabbitmq-consumer-group cluster and adds a topic resource with 6 queues. +$HELIX_RABBITMQ_ROOT/bin/setup-cluster.sh localhost:2199 +

Add consumers

Start 2 consumers in 2 different terminals. Each consumer is given a unique id. ``` //start-consumer.sh zookeeperAddress (e.g. localhost:2181) consumerId , rabbitmqServer (e.g. localhost) $HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 0 localhost $HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 1 localhost

+
Start HelixController
+--------------------
+Now start a Helix controller that starts managing the "rabbitmq-consumer-group" cluster.
+

$HELIX_RABBITMQ_ROOT/bin/start-cluster-manager.sh localhost:2199 ```

Send messages to the Topic

Start sending messages to the topic. This script randomly selects a routing key (1-6) and sends the message to topic. Based on the key, messages gets routed to the appropriate queue.

+
$HELIX_RABBITMQ_ROOT/bin/send-message.sh localhost 20
+

After running this, you should see all 20 messages being processed by 2 consumers.

Add another consumer

Once a new consumer is started, helix detects it. In order to balance the load between 3 consumers, it deallocates 1 partition from the existing consumers and allocates it to the new consumer. We see that each consumer is now processing only 2 queues. Helix makes sure that old nodes are asked to stop consuming before the new consumer is asked to start consuming for a given partition. But the transitions for each partition can happen in parallel. +$HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 2 localhost +

Send messages again to the topic. +$HELIX_RABBITMQ_ROOT/bin/send-message.sh localhost 100 + You should see that messages are now received by all 3 consumers.

Stop a consumer

In any terminal press CTRL^C and notice that Helix detects the consumer failure and distributes the 2 partitions that were processed by failed consumer to the remaining 2 active consumers.

How does it work

Find the entire code here.

Cluster setup

This step creates znode on zookeeper for the cluster and adds the state model. We use online offline state model since there is no need for other states. The consumer is either processing a queue or it is not.

It creates a resource called rabbitmq-consumer-group with 6 partitions. The execution mode is set to AUTO_REBALANCE. Thi s means that the Helix controls the assignment of partition to consumers and automatically distributes the partitions evenly among the active consumers. When a consumer is added or removed, it ensures that a minimum number of partitions are shuffled.

+
      zkclient = new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+          ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+      ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+      
+      // add cluster
+      admin.addCluster(clusterName, true);
+
+      // add state model definition
+      StateModelConfigGenerator generator = new StateModelConfigGenerator();
+      admin.addStateModelDef(clusterName, "OnlineOffline",
+          new StateModelDefinition(generator.generateConfigForOnlineOffline()));
+
+      // add resource "topic" which has 6 partitions
+      String resourceName = "rabbitmq-consumer-group";
+      admin.addResource(clusterName, resourceName, 6, "OnlineOffline", "AUTO_REBALANCE");
+

Starting the consumers

The only thing consumers need to know is the zkaddress, cluster name and consumer id. It does not need to know anything else.

+
   _manager =
+          HelixManagerFactory.getZKHelixManager(_clusterName,
+                                                _consumerId,
+                                                InstanceType.PARTICIPANT,
+                                                _zkAddr);
+
+      StateMachineEngine stateMach = _manager.getStateMachineEngine();
+      ConsumerStateModelFactory modelFactory =
+          new ConsumerStateModelFactory(_consumerId, _mqServer);
+      stateMach.registerStateModelFactory("OnlineOffline", modelFactory);
+
+      _manager.connect();
+
+

Once the consumer has registered the statemodel and the controller is started, the consumer starts getting callbacks (onBecomeOnlineFromOffline) for the partition it needs to host. All it needs to do as part of the callback is to start consuming messages from the appropriate queue. Similarly, when the controller deallocates a partitions from a consumer, it fires onBecomeOfflineFromOnline for the same partition. As a part of this transition, the consumer will stop consuming from a that queue.

+
 @Transition(to = "ONLINE", from = "OFFLINE")
+  public void onBecomeOnlineFromOffline(Message message, NotificationContext context)
+  {
+    LOG.debug(_consumerId + " becomes ONLINE from OFFLINE for " + _partition);
+
+    if (_thread == null)
+    {
+      LOG.debug("Starting ConsumerThread for " + _partition + "...");
+      _thread = new ConsumerThread(_partition, _mqServer, _consumerId);
+      _thread.start();
+      LOG.debug("Starting ConsumerThread for " + _partition + " done");
+
+    }
+  }
+
+  @Transition(to = "OFFLINE", from = "ONLINE")
+  public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
+      throws InterruptedException
+  {
+    LOG.debug(_consumerId + " becomes OFFLINE from ONLINE for " + _partition);
+
+    if (_thread != null)
+    {
+      LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
+
+      _thread.interrupt();
+      _thread.join(2000);
+      _thread = null;
+      LOG.debug("Stopping " +  _consumerId + " for " + _partition + " done");
+
+    }
+  }
+
+
+
+ +
+ + + + \ No newline at end of file