Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 348A0D48F for ; Tue, 30 Oct 2012 22:59:42 +0000 (UTC) Received: (qmail 95244 invoked by uid 500); 30 Oct 2012 22:59:41 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 95129 invoked by uid 500); 30 Oct 2012 22:59:41 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 94603 invoked by uid 99); 30 Oct 2012 22:59:40 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Oct 2012 22:59:40 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 60B4550E91; Tue, 30 Oct 2012 22:59:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: brock@apache.org To: commits@flume.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [14/24] git commit: FLUME-1620: Update flume user guide for LoadBalancingSinkProcessor with the backoff changes. Message-Id: <20121030225940.60B4550E91@tyr.zones.apache.org> Date: Tue, 30 Oct 2012 22:59:40 +0000 (UTC) FLUME-1620: Update flume user guide for LoadBalancingSinkProcessor with the backoff changes. (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/94f50ef9 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/94f50ef9 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/94f50ef9 Branch: refs/heads/FLUME-1502 Commit: 94f50ef9c37845964e515c489661df124d19aace Parents: 250c457 Author: Brock Noland Authored: Thu Oct 18 11:58:52 2012 -0500 Committer: Brock Noland Committed: Thu Oct 18 11:58:52 2012 -0500 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeDeveloperGuide.rst | 54 +++++++++++++++++++--- flume-ng-doc/sphinx/FlumeUserGuide.rst | 35 ++++++++------ 2 files changed, 66 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/94f50ef9/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst index 8b73a06..25ded18 100644 --- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst +++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst @@ -197,7 +197,7 @@ Failover handler '''''''''''''''' This class wraps the Avro RPC client to provide failover handling capability to -clients. This takes a list of host/ports of the Flume agent. If there’s an +clients. This takes a whitespace separated list of host/ports of the Flume agents. If there’s an error in communicating the current agent, then it automatically falls back to the next agent in the list: @@ -212,13 +212,53 @@ the next agent in the list: // address/port pair for each host props.put("hosts.host1", host1 + ":" + port1); - props.put("hosts.host1", host2 + ":" + port2); - props.put("hosts.host1", host3 + ":" + port3); + props.put("hosts.host2", host2 + ":" + port2); + props.put("hosts.host3", host3 + ":" + port3); // create the client with failover properties - client = (FailoverRpcClient); - RpcClientFactory.getInstance(props); + client = RpcClientFactory.getInstance(props); + +LoadBalancing Rpc Client +'''''''''''''''''''''''' + +Flume SDK also supports an RpcClient which load balances between multiple +hosts. This takes a whitespace separated list of host:port of Flume agents. This +client can be configured to either load balance or randomly select among the +configured agents. You can also specify a class that implements the +``LoadBalancingRpcClient$HostSelector`` interface in the properties object to +generate the selection order. + +If ``backoff`` is enabled, the client will blacklist +hosts that fail, removing them for selection for a given timeout. When the +timeout ends, if the host is still unresponsive timeout is increased +exponentially to avoid potentially getting stuck in long waits on unresponsive +hosts. +The maximum backoff time can be configured by setting ``maxBackoff`` - in milliseconds. +There is currently no default maximum back off time, so the backoff will increase +exponentially unless this property is set. +.. code-block:: java + + // Setup properties for the load balancing + Properties props = new Properties(); + props.put("client.type", "DEFAULT_LOADBALANCE"); + + // list of hosts + props.put("hosts", "host1 host2 host3"); + + // address/port pair for each host + props.put("hosts.host1", host1 + ":" + port1); + props.put("hosts.host2", host2 + ":" + port2); + props.put("hosts.host3", host3 + ":" + port3); + + props.put("host-selector","random"); //for random order + //props.put("host-selector","round_robin"); //for round robin order + props.put("backoff", "true"); //disabled by default. + + props.put("maxBackoff", "10000"); //default = No Maximum. + + // create the client with load balancing properties + client = RpcClientFactory.getInstance(props); Transaction interface ~~~~~~~~~~~~~~~~~~~~~ @@ -280,7 +320,7 @@ configuration settings: public class FooSink extends AbstractSink implements Configurable { @Override public void configure(Context context) { - some_Param = context.get("some_param", String.class); + some_Param = context.getString("some_param", "default_value"); // process some_param … } @Override @@ -336,7 +376,7 @@ data: public class BarSource extends AbstractSource implements Configurable, PollableSource { @Override public void configure(Context context) { - some_Param = context.get("some_param", String.class); + some_Param = context.getString("some_param", "default_value"); // process some_param … } @Override http://git-wip-us.apache.org/repos/asf/flume/blob/94f50ef9/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 8160ca4..86bbd1a 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -894,11 +894,11 @@ Example for agent named **agent_foo**: agent_foo.channels = memoryChannel-1 agent_foo.sources.legacysource-1.type = your.namespace.YourClass agent_foo.sources.legacysource-1.channels = memoryChannel-1 - + Scribe Source ~~~~~~~~~~~~~ -Scribe is another type of ingest system. To adopt existing Scribe ingest system, +Scribe is another type of ingest system. To adopt existing Scribe ingest system, Flume should use ScribeSource based on Thrift with compatible transfering protocol. The deployment of Scribe please following guide from Facebook. Required properties are in **bold**. @@ -1602,11 +1602,10 @@ Load balancing Sink Processor Load balancing sink processor provides the ability to load-balance flow over multiple sinks. It maintains an indexed list of active sinks on which the load must be distributed. Implementation supports distributing load using -either via ``ROUND_ROBIN``, ``RANDOM``, ``ROUND_ROBIN_BACKOFF``, or -``RANDOM_BACKOFF`` selection mechanisms. The choice of selection mechanism -defaults to ``ROUND_ROBIN`` type, but can be overridden -via configuration. Custom selection mechanisms are supported via custom -classes that inherits from ``LoadBalancingSelector``. +either via ``ROUND_ROBIN`` or ``RANDOM`` selection mechanisms. +The choice of selection mechanism defaults to ``ROUND_ROBIN`` type, +but can be overridden via configuration. Custom selection mechanisms are +supported via custom classes that inherits from ``AbstractSinkSelector``. When invoked, this selector picks the next sink using its configured selection mechanism and invokes it. For ROUND_ROBIN and RANDOM In case the selected sink @@ -1614,7 +1613,9 @@ fails to deliver the event, the processor picks the next available sink via its configured selection mechanism. This implementation does not blacklist the failing sink and instead continues to optimistically attempt every available sink. If all sinks invocations result in failure, the selector -propagates the failure to the sink runner. The BACKOFF variants will blacklist +propagates the failure to the sink runner. + +If ``backoff`` is enabled, the sink processor will blacklist sinks that fail, removing them for selection for a given timeout. When the timeout ends, if the sink is still unresponsive timeout is increased exponentially to avoid potentially getting stuck in long waits on unresponsive @@ -1624,16 +1625,16 @@ sinks. Required properties are in **bold**. -==================================== =============== =============================================================== +==================================== =============== ========================================================================== Property Name Default Description -==================================== =============== =============================================================== -**processor.sinks** -- Space separated list of sinks that are participating in the group +==================================== =============== ========================================================================== +**processor.sinks** -- Space separated list of sinks that are participating in the group **processor.type** ``default`` The component type name, needs to be ``load_balance`` +processor.backoff true Should failed sinks be backed off exponentially. processor.selector ``ROUND_ROBIN`` Selection mechanism. Must be either ``ROUND_ROBIN``, ``RANDOM`` - ``ROUND_ROBIN_BACKOFF``, ``RANDOM_BACKOFF`` or custom FQDN to - class that inherits from ``LoadBalancingSelector`` + or custom FQDN to class that inherits from ``AbstractSinkSelector`` processor.selector.maxBackoffMillis 30000 used by backoff selectors to limit exponential backoff in miliseconds -==================================== =============== =============================================================== +==================================== =============== ========================================================================== Example for agent named **agent_foo**: @@ -1642,8 +1643,10 @@ Example for agent named **agent_foo**: agent_foo.sinkgroups = group1 agent_foo.sinkgroups.group1.sinks = sink1 sink2 agent_foo.sinkgroups.group1.processor.type = load_balance + agent_foo.sinkgroups.group1.processor.backoff = true agent_foo.sinkgroups.group1.processor.selector = random + Custom Sink Processor ~~~~~~~~~~~~~~~~~~~~~ @@ -1679,8 +1682,8 @@ are named components, here is an example of how they are created through configu Note that the interceptor builders are passed to the type config parameter. The interceptors are themselves configurable and can be passed configuration values just like they are passed to any other configurable component. In the above example, events are passed to the HostInterceptor first and the events returned by the HostInterceptor -are then passed along to the TimestampInterceptor. You can specify either the fully qualified class name (FQCN) -or the alias ``TIMESTAMP``. If you have multiple collectors writing to the same HDFS path then you could also use +are then passed along to the TimestampInterceptor. You can specify either the fully qualified class name (FQCN) +or the alias ``TIMESTAMP``. If you have multiple collectors writing to the same HDFS path then you could also use the HostInterceptor. Timestamp Interceptor