Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 63A04FE03 for ; Tue, 7 May 2013 03:12:38 +0000 (UTC) Received: (qmail 75847 invoked by uid 500); 7 May 2013 03:12:38 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 75817 invoked by uid 500); 7 May 2013 03:12:38 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 75799 invoked by uid 99); 7 May 2013 03:12:38 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 May 2013 03:12:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 May 2013 03:12:26 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7B47A23889FD; Tue, 7 May 2013 03:12:03 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1479756 [2/39] - in /incubator/helix/site-content: ./ apidocs/reference/org/apache/helix/ apidocs/reference/org/apache/helix/agent/ apidocs/reference/org/apache/helix/manager/zk/ apidocs/reference/org/apache/helix/messaging/handling/ apido... Date: Tue, 07 May 2013 03:11:00 -0000 To: commits@helix.incubator.apache.org From: kishoreg@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130507031203.7B47A23889FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: incubator/helix/site-content/Features.html URL: http://svn.apache.org/viewvc/incubator/helix/site-content/Features.html?rev=1479756&r1=1479755&r2=1479756&view=diff ============================================================================== --- incubator/helix/site-content/Features.html (original) +++ incubator/helix/site-content/Features.html Tue May 7 03:10:53 2013 @@ -1,13 +1,13 @@ - + Apache Helix - @@ -52,6 +52,9 @@
  • Quick Start
  • +
  • Core concept +
  • +
  • Tutorial
  • @@ -162,7 +165,7 @@ -
  • Last Published: 2013-04-27
  • +
  • Last Published: 2013-05-06
  • @@ -187,26 +190,111 @@ software distributed under the License i "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations -under the License. -->

    Partition Placement

    The placement of partitions in a DDS is very critical for reliability and scalability of the system. For example, when a node fails, it is important that the partitions hosted on that node are reallocated evenly among the remaining nodes. Consistent hashing is one such algorithm that can guarantee this. Helix by default comes with a variant of consistent hashing based of the RUSH algorithm. This means given a number of partitions, replicas and number of nodes Helix does the automatic assignment of partition to nodes such that

    +under the License. -->

    CONFIGURING IDEALSTATE

    Read concepts page for definition of Idealstate.

    The placement of partitions in a DDS is very critical for reliability and scalability of the system. For example, when a node fails, it is important that the partitions hosted on that node are reallocated evenly among the remaining nodes. Consistent hashing is one such algorithm that can guarantee this. Helix by default comes with a variant of consistent hashing based of the RUSH algorithm.

    This means given a number of partitions, replicas and number of nodes Helix does the automatic assignment of partition to nodes such that

    • Each node has the same number of partitions and replicas of the same partition do not stay on the same node.
    • When a node fails, the partitions will be equally distributed among the remaining nodes
    • When new nodes are added, the number of partitions moved will be minimized along with satisfying the above two criteria.
    • -

    In simple terms, partition assignment can be defined as the mapping of Replica,State to a Node in the cluster. For example, lets say the system as 2 partitions(P1,P2) and each partition has 2 replicas and there are 2 nodes(N1,N2) in the system and two possible states Master and Slave

    The partition assignment table can look like

    -
    P1 -> {N1:M, N2:S}
    -P2 -> {N1:S, N2:M}
    -

    This means Partition P1 must be a Master at N1 and Slave at N2 and vice versa for P2

    Helix provides multiple ways to control the partition placement. See Execution modes section for more info on this.

    IdealState execution modes Idealstate is defined as the state of the DDS when all nodes are up and running and healthy. Helix uses this as the target state of the system and computes the appropriate transitions needed in the system to bring it to a stable state.

    Helix supports 3 different execution modes which allows application to explicitly control the placement and state of the replica.

    AUTO_REBALANCE

    When the idealstate mode is set to AUTO_REBALANCE, Helix controls both the location of the replica along with the state. This option is useful for applications where creation of a replica is not expensive. A typical example is evenly distr ibuting a group of tasks among the currently alive processes. For example, if there are 60 tasks and 4 nodes, Helix assigns 15 tasks to each node. When one node fails Helix redistributes its 15 tasks to the remaining 3 nodes. Similarly, if a node is added, Helix re-allocates 3 tasks from each of the 4 nodes to the 5th node.

    AUTO

    When the idealstate mode is set to AUTO, Helix only controls STATE of the replicas where as the location of the partition is controlled by application. For example the application can say P1->{N1,N2,N3} which means P1 should only exist N1,N2,N3. In this mode when N1 fails, unlike in AUTO-REBALANCE mode the partition is not moved from N1 to others nodes in the cluster. But Helix might decide to change the state of P1 in N2 and N3 based on the system constraints. For example, if a system constraint specified that there should be 1 Master and if the Master failed, then N2 will be made the master.

    CUSTOM

    Helix offers a third mode called CUSTOM, in which application can completely control the placement and state of each replica. Applications will have to implement an interface that Helix will invoke when the cluster state changes. Within this callback, the application can recompute the partition assignment mapping. Helix will then issue transitions to get the system to the final state. Note that Helix will ensure that system constraints are not violated at any time. For example, the current state of the system might be P1 -> {N1:M,N2:S} and the application changes the ideal state to P2 -> {N1:S,N2:M}. Helix will not blindly issue M-S to N1 and S-M to N2 in parallel since it might result in a transient state where both N1 and N2 are masters. Helix will issue S-M to N2 only when N1 has changed its state to S.

    State Machine Configuration

    Helix comes with 3 default state models that are most commonly used. Its possible to have multiple state models in a cluster. Every resource that is added should have a reference to the state model.

    +

    Helix provides multiple ways to control the placement and state of a replica.

    +
                |AUTO REBALANCE|   AUTO     |   CUSTOM  |       
    +            -----------------------------------------
    +   LOCATION | HELIX        |  APP       |  APP      |
    +            -----------------------------------------
    +      STATE | HELIX        |  HELIX     |  APP      |
    +            -----------------------------------------
    +

    HELIX EXECUTION MODE

    Idealstate is defined as the state of the DDS when all nodes are up and running and healthy. Helix uses this as the target state of the system and computes the appropriate transitions needed in the system to bring it to a stable state.

    Helix supports 3 different execution modes which allows application to explicitly control the placement and state of the replica.

    AUTO_REBALANCE

    When the idealstate mode is set to AUTO_REBALANCE, Helix controls both the location of the replica along with the state. This option is useful for applications where creation of a replica is not expensive. Example

    +
    {
    +  "id" : "MyResource",
    +  "simpleFields" : {
    +    "IDEAL_STATE_MODE" : "AUTO_REBALANCE",
    +    "NUM_PARTITIONS" : "3",
    +    "REPLICAS" : "2",
    +    "STATE_MODEL_DEF_REF" : "MasterSlave",
    +  }
    +  "listFields" : {
    +    "MyResource_0" : [],
    +    "MyResource_1" : [],
    +    "MyResource_2" : []
    +  },
    +  "mapFields" : {
    +  }
    +}
    +

    If there are 3 nodes in the cluster, then Helix will internally compute the ideal state as

    +
    {
    +  "id" : "MyResource",
    +  "simpleFields" : {
    +    "NUM_PARTITIONS" : "3",
    +    "REPLICAS" : "2",
    +    "STATE_MODEL_DEF_REF" : "MasterSlave",
    +  },
    +  "mapFields" : {
    +    "MyResource_0" : {
    +      "N1" : "MASTER",
    +      "N2" : "SLAVE",
    +    },
    +    "MyResource_1" : {
    +      "N2" : "MASTER",
    +      "N3" : "SLAVE",
    +    },
    +    "MyResource_2" : {
    +      "N3" : "MASTER",
    +      "N1" : "SLAVE",
    +    }
    +  }
    +}
    +

    Another typical example is evenly distributing a group of tasks among the currently alive processes. For example, if there are 60 tasks and 4 nodes, Helix assigns 15 tasks to each node. When one node fails Helix redistributes its 15 tasks to the remaining 3 nodes. Similarly, if a node is added, Helix re-allocates 3 tasks from each of the 4 nodes to the 5th node.

    AUTO

    When the idealstate mode is set to AUTO, Helix only controls STATE of the replicas where as the location of the partition is controlled by application. Example: The below idealstate indicates thats MyResource_0 must be only on node1 and node2. But gives the control of assigning the STATE to Helix.

    +
    {
    +  "id" : "MyResource",
    +  "simpleFields" : {
    +    "IDEAL_STATE_MODE" : "AUTO",
    +    "NUM_PARTITIONS" : "3",
    +    "REPLICAS" : "2",
    +    "STATE_MODEL_DEF_REF" : "MasterSlave",
    +  }
    +  "listFields" : {
    +    "MyResource_0" : [node1, node2],
    +    "MyResource_1" : [node2, node3],
    +    "MyResource_2" : [node3, node1]
    +  },
    +  "mapFields" : {
    +  }
    +}
    +

    In this mode when node1 fails, unlike in AUTO-REBALANCE mode the partition is not moved from node1 to others nodes in the cluster. Instead, Helix will decide to change the state of MyResource_0 in N2 based on the system constraints. For example, if a system constraint specified that there should be 1 Master and if the Master failed, then node2 will be made the new master.

    CUSTOM

    Helix offers a third mode called CUSTOM, in which application can completely control the placement and state of each replica. Applications will have to implement an interface that Helix will invoke when the cluster state changes. Within this callback, the application can recompute the idealstate. Helix will then issue appropriate transitions such that Idealstate and Currentstate converges.

    +
    {
    +  "id" : "MyResource",
    +  "simpleFields" : {
    +      "IDEAL_STATE_MODE" : "CUSTOM",
    +    "NUM_PARTITIONS" : "3",
    +    "REPLICAS" : "2",
    +    "STATE_MODEL_DEF_REF" : "MasterSlave",
    +  },
    +  "mapFields" : {
    +    "MyResource_0" : {
    +      "N1" : "MASTER",
    +      "N2" : "SLAVE",
    +    },
    +    "MyResource_1" : {
    +      "N2" : "MASTER",
    +      "N3" : "SLAVE",
    +    },
    +    "MyResource_2" : {
    +      "N3" : "MASTER",
    +      "N1" : "SLAVE",
    +    }
    +  }
    +}
    +

    For example, the current state of the system might be MyResource_0 -> {N1:MASTER,N2:SLAVE} and the application changes the ideal state to MyResource_0 -> {N1:SLAVE,N2:MASTER}. Helix will not blindly issue MASTER>SLAVE to N1 and SLAVE>MASTER to N2 in parallel since it might result in a transient state where both N1 and N2 are masters. Helix will first issue MASTER>SLAVE to N1 and after its completed it will issue SLAVE>MASTER to N2.

    State Machine Configuration

    Helix comes with 3 default state models that are most commonly used. Its possible to have multiple state models in a cluster. Every resource that is added should have a reference to the state model.

    • MASTER-SLAVE: Has 3 states OFFLINE,SLAVE,MASTER. Max masters is 1. Slaves will be based on the replication factor. Replication factor can be specified while adding the resource
    • ONLINE-OFFLINE: Has 2 states OFFLINE and ONLINE. Very simple state model and most applications start off with this state model.
    • LEADER-STANDBY:1 Leader and many stand bys. In general the standbys are idle.
    • -

    Apart from providing the state machine configuration, one can specify the constraints of states and transitions.

    For example one can say Master:1. Max number of replicas in Master state at any time is 1. OFFLINE-SLAVE:5 Max number of Offline-Slave transitions that can happen concurrently in the system

    STATE PRIORITY Helix uses greedy approach to satisfy the state constraints. For example if the state machine configuration says it needs 1 master and 2 slaves but only 1 node is active, Helix must promote it to master. This behavior is achieved by providing the state priority list as MASTER,SLAVE.

    STATE TRANSITION PRIORITY Helix tries to fire as many transitions as possible in parallel to reach the stable state without violating constraints. By default Helix simply sorts the transitions alphabetically and fires as many as it can without violating the constraints. One can control this by overriding the priority order.

    C onfig management

    Helix allows applications to store application specific properties. The configuration can have different scopes.

    +

    Apart from providing the state machine configuration, one can specify the constraints of states and transitions.

    For example one can say Master:1. Max number of replicas in Master state at any time is 1. OFFLINE-SLAVE:5 Max number of Offline-Slave transitions that can happen concurrently in the system

    STATE PRIORITY Helix uses greedy approach to satisfy the state constraints. For example if the state machine configuration says it needs 1 master and 2 slaves but only 1 node is active, Helix must promote it to master. This behavior is achieved by providing the state priority list as MASTER,SLAVE.

    STATE TRANSITION PRIORITY Helix tries to fire as many transitions as possible in parallel to reach the stable state without violating constraints. By default Helix simply sorts the transitions alphabetically and fires as many as it can without violating the constraints. One can control this by overriding the priority order.

    C onfig management

    Helix allows applications to store application specific properties. The configuration can have different scopes.

    • Cluster
    • Node specific
    • Resource specific
    • Partition specific
    • -

    Helix also provides notifications when any configs are changed. This allows applications to support dynamic configuration changes.

    See HelixManager.getConfigAccessor for more info

    Intra cluster messaging api

    This is an interesting feature which is quite useful in practice. Often times, nodes in DDS requires a mechanism to interact with each other. One such requirement is a process of bootstrapping a replica.

    Consider a search system use case where the index replica starts up and it does not have an index. One of the commonly used solutions is to get the index from a common location or to copy the index from another replica. Helix provides a messaging api, that can be used to talk to other nodes in the system. The value added that Helix provides here is, message recipient can be specified in terms of resource, partition, state and Helix ensures that the message is delivered to all of the required recipients. In this particular use case, the instance can specify the recipient criteria as all replicas of P1. Since Helix is aware of the global state of the system, it can send the message to appropriate nodes. Once the nodes respond Helix provides the bootstrapping replica with all the responses.

    This is a very generic api and can also be used to schedule various periodic tasks in the cluster like data backups etc. System Admins can also perform adhoc tasks like on demand backup or execute a system command(like rm -rf ;-)) across all nodes.

    +

    Helix also provides notifications when any configs are changed. This allows applications to support dynamic configuration changes.

    See HelixManager.getConfigAccessor for more info

    Intra cluster messaging api

    This is an interesting feature which is quite useful in practice. Often times, nodes in DDS requires a mechanism to interact with each other. One such requirement is a process of bootstrapping a replica.

    Consider a search system use case where the index replica starts up and it does not have an index. One of the commonly used solutions is to get the index from a common location or to copy the index from another replica. Helix provides a messaging api, that can be used to talk to other nodes in the system. The value added that Helix provides here is, message recipient can be specified in terms of resource, partition, state and Helix ensures that the message is delivered to all of the required recipients. In this particular use case, the instance can specify the recipient criteria as all replicas of P1. Since Helix is aware of the global state of the system, it can send the message to appropriate nodes. Once the nodes respond Helix provides the bootstrapping replica with all the responses.

    This is a very generic api and can also be used to schedule various periodic tasks in the cluster like data backups etc. System Admins can also perform adhoc tasks like on demand backup or execute a system command(like rm -rf ;-)) across all nodes.

          ClusterMessagingService messagingService = manager.getMessagingService();
           //CONSTRUCT THE MESSAGE
           Message requestBackupUriRequest = new Message(
    @@ -216,7 +304,7 @@ P2 -> {N1:S, N2:M}
           requestBackupUriRequest.setMsgState(MessageState.NEW);
           //SET THE RECIPIENT CRITERIA, All nodes that satisfy the criteria will receive the message
           Criteria recipientCriteria = new Criteria();
    -      recipientCriteria.setInstanceName("*");
    +      recipientCriteria.setInstanceName("%");
           recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
           recipientCriteria.setResource("MyDB");
           recipientCriteria.setPartition("");
    @@ -230,13 +318,13 @@ P2 -> {N1:S, N2:M}
           //This will return only after all recipients respond or after timeout.
           int sentMessageCount = messagingService.sendAndWait(recipientCriteria,
               requestBackupUriRequest, responseHandler, timeout);
    -

    See HelixManager.getMessagingService for more info.

    Application specific property storage

    There are several usecases where applications needs support for distributed data structures. Helix uses Zookeeper to store the application data and hence provides notifications when the data changes. One value add Helix provides is the ability to specify cache the data and also write through cache. This is more efficient than reading from ZK every time.

    See HelixManager.getHelixPropertyStore

    Throttling

    Since all state changes in the system are triggered through transitions, Helix can control the number of transitions that can happen in parallel. Some of the transitions may be light weight but some might involve moving data around which is quite expensive. Helix allows applications to set threshold on transitio ns. The threshold can be set at the multiple scopes.

    +

    See HelixManager.getMessagingService for more info.

    Application specific property storage

    There are several usecases where applications needs support for distributed data structures. Helix uses Zookeeper to store the application data and hence provides notifications when the data changes. One value add Helix provides is the ability to specify cache the data and also write through cache. This is more efficient than reading from ZK every time.

    See HelixManager.getHelixPropertyStore

    Throttling

    Since all state changes in the system are triggered through transitions, Helix can control the number of transitions that can happen in parallel. Some of the transitions may be light weight but some might involve moving data around which is quite expensive. Helix allows applications to set threshold on transitio ns. The threshold can be set at the multiple scopes.

    • MessageType e.g STATE_TRANSITION
    • TransitionType e.g SLAVE-MASTER
    • Resource e.g database
    • Node i.e per node max transitions in parallel.
    • -

    See HelixManager.getHelixAdmin.addMessageConstraint()

    Health monitoring and alerting

    This in currently in development mode, not yet productionized.

    Helix provides ability for each node in the system to report health metrics on a periodic basis. Helix supports multiple ways to aggregate these metrics like simple SUM, AVG, EXPONENTIAL DECAY, WINDOW. Helix will only persist the aggregated value. Applications can define threshold on the aggregate values according to the SLAs and when the SLA is violated Helix will fire an alert. Currently Helix only fires an alert but eventually we plan to use this metrics to either mark the node dead or load balance the partitions. This feature will be valuable in for distributed systems that support multi-tenancy and have huge variation in work load patterns. Another place this can be used is to detect skewed partitions and rebalance the cluste r.

    This feature is not yet stable and do not recommend to be used in production.

    Controller deployment modes

    Read Architecture wiki for more details on the Role of a controller. In simple words, it basically controls the participants in the cluster by issuing transitions.

    Helix provides multiple options to deploy the controller.

    STANDALONE

    Controller can be started as a separate process to manage a cluster. This is the recommended approach. How ever since one controller can be a single point of failure, multiple controller processes are required for reliability. Even if multiple controllers are running only one will be actively managing the cluster at any time and is decided by a leader election process. If the leader fails, another leader will resume managing the cluster.

    Even though we recommend this method of deployment, it has the drawback of having to manage an additional service for each cluster. See Controller As a Service option.

    EMBEDDED

    If setting up a separate controller process is not viable, then it is possible to embed the controller as a library in each of the participant.

    CONTROLLER AS A SERVICE

    One of the cool feature we added in helix was use a set of controllers to manage a large number of clusters. For example if you have X clusters to be managed, instead of deploying X*3(3 controllers for fault tolerance) controllers for each cluster, one can deploy only 3 controllers. Each controller can manage X/3 clusters. If any controller fails the remaining two will manage X/2 clusters. At LinkedIn, we always deploy controllers in this mode.

    +

    See HelixManager.getHelixAdmin.addMessageConstraint()

    Health monitoring and alerting

    This in currently in development mode, not yet productionized.

    Helix provides ability for each node in the system to report health metrics on a periodic basis. Helix supports multiple ways to aggregate these metrics like simple SUM, AVG, EXPONENTIAL DECAY, WINDOW. Helix will only persist the aggregated value. Applications can define threshold on the aggregate values according to the SLAs and when the SLA is violated Helix will fire an alert. Currently Helix only fires an alert but eventually we plan to use this metrics to either mark the node dead or load balance the partitions. This feature will be valuable in for distributed systems that support multi-tenancy and have huge variation in work load patterns. Another place this can be used is to detect skewed partitions and rebalance the cluste r.

    This feature is not yet stable and do not recommend to be used in production.

    Controller deployment modes

    Read Architecture wiki for more details on the Role of a controller. In simple words, it basically controls the participants in the cluster by issuing transitions.

    Helix provides multiple options to deploy the controller.

    STANDALONE

    Controller can be started as a separate process to manage a cluster. This is the recommended approach. How ever since one controller can be a single point of failure, multiple controller processes are required for reliability. Even if multiple controllers are running only one will be actively managing the cluster at any time and is decided by a leader election process. If the leader fails, another leader will resume managing the cluster.

    Even though we recommend this method of depl oyment, it has the drawback of having to manage an additional service for each cluster. See Controller As a Service option.

    EMBEDDED

    If setting up a separate controller process is not viable, then it is possible to embed the controller as a library in each of the participant.

    CONTROLLER AS A SERVICE

    One of the cool feature we added in helix was use a set of controllers to manage a large number of clusters. For example if you have X clusters to be managed, instead of deploying X*3(3 controllers for fault tolerance) controllers for each cluster, one can deploy only 3 controllers. Each controller can manage X/3 clusters. If any controller fails the remaining two will manage X/2 clusters. At LinkedIn, we always deploy controllers in this mode.

    Modified: incubator/helix/site-content/Quickstart.html URL: http://svn.apache.org/viewvc/incubator/helix/site-content/Quickstart.html?rev=1479756&r1=1479755&r2=1479756&view=diff ============================================================================== --- incubator/helix/site-content/Quickstart.html (original) +++ incubator/helix/site-content/Quickstart.html Tue May 7 03:10:53 2013 @@ -1,13 +1,13 @@ - + Apache Helix - @@ -52,6 +52,9 @@
  • Quick Start
  • +
  • Core concept +
  • +
  • Tutorial
  • @@ -162,7 +165,7 @@ -
  • Last Published: 2013-04-27
  • +
  • Last Published: 2013-05-06
  • Added: incubator/helix/site-content/Sample_App.html URL: http://svn.apache.org/viewvc/incubator/helix/site-content/Sample_App.html?rev=1479756&view=auto ============================================================================== --- incubator/helix/site-content/Sample_App.html (added) +++ incubator/helix/site-content/Sample_App.html Tue May 7 03:10:53 2013 @@ -0,0 +1,369 @@ + + + + + + + + + Apache Helix - + + + + + + + + + + + + + + + + + +
    + + + + + + +
    + +

    RabbitMQ Consumer Group

    RabbitMQ is a well known Open source software the provides robust messaging for applications.

    One of the commonly implemented recipes using this software is a work queue. http://www.rabbitmq.com/tutorials/tutorial-four-java.html describes the use case where

    +
      +
    • A producer sends a message with a routing key.
    • +
    • The message goes to the queues whose binding key exactly matches the routing key of the message.
    • +
    • There are multiple consumers and each consumer is interested in processing only a subset of the messages by binding to the interested keys
    • +

    The example provided here describes how multiple consumers can be started to process all the tasks.

    While this works, in production systems one needs the following * Ability to handle failures: when a consumers fails another consumer must be started or the other consumers must start processing these messages that should have been processed by the failed consumer. * When the existing consumers cannot keep up with the task generation rate, new consumers will be added. The tasks must be redistributed among all the consumers.

    In this sample app, we explain how these set of consumers can be grouped together and handle consumer failures and expansion automatically.

    Mapping this usecase to Helix is pretty easy as the binding key/routing key is equivalent to a partition.

    Lets take a real example. Lets say a topic has 6 partitions, and we have 2 consumers to process all the queues. What we want is all 6 queues to be evenly divided among 2 consumers. Eventually when the system scales, we add more consumers to keep up. This will make each consumer process tasks from 2 queues. Now lets say that a consumer fails and that the number of active consumers is now reduced to 2. This means each consumer must process 3 queues.

    We showcase how such a dynamic App can be developed using Helix.

    Try it

    Before getting into the details on how to develop such an App using Helix, you can try the following steps to get a feel of it.

    +
    git clone git@github.com:linkedin/helix.git
    +cd helix
    +./build
    +export HELIX_PKG_ROOT=`pwd`/helix-core/target/helix-core-pkg
    +

    OR Download the latest 0.5.28 release tar ball from here +tar -xzvf helix-core-pkg-0.5.28.tar.gz +export HELIX_PKG_ROOT=`pwd`/helix-core-pkg +

    Download the rabbitmq-consumer-group recipe from here +tar -xzvf rabbitmq-consumer-group-0.5.28.tar.gz +export HELIX_RABBITMQ_ROOT=`pwd`/rabbitmq-consumer-group/ + +chmod +x $HELIX_PKG_ROOT/bin/* +chmod +x $HELIX_RABBITMQ_ROOT/bin/* +

    Install Rabbit MQ

    Setting up RabbitMQ on a local box is straightforward. You can find the instructions here http://www.rabbitmq.com/download.html

    Start ZK

    Start zookeeper at port 2199 +$HELIX_PKG_ROOT/bin/start-standalone-zookeeper 2199 +

    Setup the consumer group cluster

    This will setup the cluster by creating a rabbitmq-consumer-group cluster and adds a topic resource with 6 queues. +$HELIX_RABBITMQ_ROOT/bin/setup-cluster.sh localhost:2199 +

    Add consumers

    Start 2 consumers in 2 different terminals. Each consumer is given a unique id. ``` //start-consumer.sh zookeeperAddress (e.g. localhost:2181) consumerId , rabbitmqServer (e.g. localhost) $HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 0 localhost $HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 1 localhost

    +
    Start HelixController
    +--------------------
    +Now start a Helix controller that starts managing the "rabbitmq-consumer-group" cluster.
    +

    $HELIX_RABBITMQ_ROOT/bin/start-cluster-manager.sh localhost:2199 ```

    Send messages to the Topic

    Start sending messages to the topic. This script randomly selects a routing key (1-6) and sends the message to topic. Based on the key, messages gets routed to the appropriate queue.

    +
    $HELIX_RABBITMQ_ROOT/bin/send-message.sh localhost 20
    +

    After running this, you should see all 20 messages being processed by 2 consumers.

    Add another consumer

    Once a new consumer is started, helix detects it. In order to balance the load between 3 consumers, it deallocates 1 partition from the existing consumers and allocates it to the new consumer. We see that each consumer is now processing only 2 queues. Helix makes sure that old nodes are asked to stop consuming before the new consumer is asked to start consuming for a given partition. But the transitions for each partition can happen in parallel. +$HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 2 localhost +

    Send messages again to the topic. +$HELIX_RABBITMQ_ROOT/bin/send-message.sh localhost 100 + You should see that messages are now received by all 3 consumers.

    Stop a consumer

    In any terminal press CTRL^C and notice that Helix detects the consumer failure and distributes the 2 partitions that were processed by failed consumer to the remaining 2 active consumers.

    How does it work

    Find the entire code here.

    Cluster setup

    This step creates znode on zookeeper for the cluster and adds the state model. We use online offline state model since there is no need for other states. The consumer is either processing a queue or it is not.

    It creates a resource called rabbitmq-consumer-group with 6 partitions. The execution mode is set to AUTO_REBALANCE. Thi s means that the Helix controls the assignment of partition to consumers and automatically distributes the partitions evenly among the active consumers. When a consumer is added or removed, it ensures that a minimum number of partitions are shuffled.

    +
          zkclient = new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
    +          ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
    +      ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
    +      
    +      // add cluster
    +      admin.addCluster(clusterName, true);
    +
    +      // add state model definition
    +      StateModelConfigGenerator generator = new StateModelConfigGenerator();
    +      admin.addStateModelDef(clusterName, "OnlineOffline",
    +          new StateModelDefinition(generator.generateConfigForOnlineOffline()));
    +
    +      // add resource "topic" which has 6 partitions
    +      String resourceName = "rabbitmq-consumer-group";
    +      admin.addResource(clusterName, resourceName, 6, "OnlineOffline", "AUTO_REBALANCE");
    +

    Starting the consumers

    The only thing consumers need to know is the zkaddress, cluster name and consumer id. It does not need to know anything else.

    +
       _manager =
    +          HelixManagerFactory.getZKHelixManager(_clusterName,
    +                                                _consumerId,
    +                                                InstanceType.PARTICIPANT,
    +                                                _zkAddr);
    +
    +      StateMachineEngine stateMach = _manager.getStateMachineEngine();
    +      ConsumerStateModelFactory modelFactory =
    +          new ConsumerStateModelFactory(_consumerId, _mqServer);
    +      stateMach.registerStateModelFactory("OnlineOffline", modelFactory);
    +
    +      _manager.connect();
    +
    +

    Once the consumer has registered the statemodel and the controller is started, the consumer starts getting callbacks (onBecomeOnlineFromOffline) for the partition it needs to host. All it needs to do as part of the callback is to start consuming messages from the appropriate queue. Similarly, when the controller deallocates a partitions from a consumer, it fires onBecomeOfflineFromOnline for the same partition. As a part of this transition, the consumer will stop consuming from a that queue.

    +
     @Transition(to = "ONLINE", from = "OFFLINE")
    +  public void onBecomeOnlineFromOffline(Message message, NotificationContext context)
    +  {
    +    LOG.debug(_consumerId + " becomes ONLINE from OFFLINE for " + _partition);
    +
    +    if (_thread == null)
    +    {
    +      LOG.debug("Starting ConsumerThread for " + _partition + "...");
    +      _thread = new ConsumerThread(_partition, _mqServer, _consumerId);
    +      _thread.start();
    +      LOG.debug("Starting ConsumerThread for " + _partition + " done");
    +
    +    }
    +  }
    +
    +  @Transition(to = "OFFLINE", from = "ONLINE")
    +  public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
    +      throws InterruptedException
    +  {
    +    LOG.debug(_consumerId + " becomes OFFLINE from ONLINE for " + _partition);
    +
    +    if (_thread != null)
    +    {
    +      LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
    +
    +      _thread.interrupt();
    +      _thread.join(2000);
    +      _thread = null;
    +      LOG.debug("Stopping " +  _consumerId + " for " + _partition + " done");
    +
    +    }
    +  }
    +
    +
    +
    + +
    + + + + \ No newline at end of file Modified: incubator/helix/site-content/Tutorial.html URL: http://svn.apache.org/viewvc/incubator/helix/site-content/Tutorial.html?rev=1479756&r1=1479755&r2=1479756&view=diff ============================================================================== --- incubator/helix/site-content/Tutorial.html (original) +++ incubator/helix/site-content/Tutorial.html Tue May 7 03:10:53 2013 @@ -1,13 +1,13 @@ - + Apache Helix - @@ -52,6 +52,9 @@
  • Quick Start
  • +
  • Core concept +
  • +
  • Tutorial
  • @@ -162,7 +165,7 @@ -
  • Last Published: 2013-04-27
  • +
  • Last Published: 2013-05-06
  • @@ -194,7 +197,7 @@ under the License. -->

    Lets walk throu admin = new ZKHelixAdmin(ZK_ADDRESS); String CLUSTER_NAME = "helix-demo"; //Create cluster namespace in zookeeper - admin.addCluster(clusterName, true); + admin.addCluster(clusterName);

    OR

        ./helix-admin.sh --zkSvr localhost:2199 --addCluster helix-demo 
     

    Configure nodes

    Add new nodes to the cluster, configure new nodes in the cluster. Each node in the cluster must be uniquely identifiable. Most commonly used convention is hostname:port.

    Modified: incubator/helix/site-content/UseCases.html URL: http://svn.apache.org/viewvc/incubator/helix/site-content/UseCases.html?rev=1479756&r1=1479755&r2=1479756&view=diff ============================================================================== --- incubator/helix/site-content/UseCases.html (original) +++ incubator/helix/site-content/UseCases.html Tue May 7 03:10:53 2013 @@ -1,13 +1,13 @@ - + Apache Helix - @@ -52,6 +52,9 @@
  • Quick Start
  • +
  • Core concept +
  • +
  • Tutorial
  • @@ -162,7 +165,7 @@ -
  • Last Published: 2013-04-27
  • +
  • Last Published: 2013-05-06