Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7210B200BA1 for ; Mon, 17 Oct 2016 14:34:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7085F160AEC; Mon, 17 Oct 2016 12:34:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 24D5A160AE5 for ; Mon, 17 Oct 2016 14:34:13 +0200 (CEST) Received: (qmail 69482 invoked by uid 500); 17 Oct 2016 12:34:13 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 69473 invoked by uid 99); 17 Oct 2016 12:34:13 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Oct 2016 12:34:13 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 7D4A4C0CF0 for ; Mon, 17 Oct 2016 12:34:12 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.199 X-Spam-Level: X-Spam-Status: No, score=-1.199 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id NBWTqsvWjjKT for ; Mon, 17 Oct 2016 12:34:07 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 3ABEB5F29A for ; Mon, 17 Oct 2016 12:34:06 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 40D05E0870 for ; Mon, 17 Oct 2016 12:34:05 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 275C63A17FE for ; Mon, 17 Oct 2016 12:34:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1765259 [2/3] - in /flume/site/trunk/content/sphinx: FlumeDeveloperGuide.rst FlumeUserGuide.rst download.rst index.rst releases/1.6.0.rst releases/1.7.0.rst releases/index.rst Date: Mon, 17 Oct 2016 12:34:05 -0000 To: commits@flume.apache.org From: bessbd@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20161017123405.275C63A17FE@svn01-us-west.apache.org> archived-at: Mon, 17 Oct 2016 12:34:16 -0000 Modified: flume/site/trunk/content/sphinx/FlumeUserGuide.rst URL: http://svn.apache.org/viewvc/flume/site/trunk/content/sphinx/FlumeUserGuide.rst?rev=1765259&r1=1765258&r2=1765259&view=diff ============================================================================== --- flume/site/trunk/content/sphinx/FlumeUserGuide.rst (original) +++ flume/site/trunk/content/sphinx/FlumeUserGuide.rst Mon Oct 17 12:34:04 2016 @@ -15,7 +15,7 @@ ====================================== -Flume 1.6.0 User Guide +Flume 1.7.0 User Guide ====================================== Introduction @@ -28,29 +28,29 @@ Apache Flume is a distributed, reliable, collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store. -The use of Apache Flume is not only restricted to log data aggregation. +The use of Apache Flume is not only restricted to log data aggregation. Since data sources are customizable, Flume can be used to transport massive quantities -of event data including but not limited to network traffic data, social-media-generated data, +of event data including but not limited to network traffic data, social-media-generated data, email messages and pretty much any data source possible. Apache Flume is a top level project at the Apache Software Foundation. There are currently two release code lines available, versions 0.9.x and 1.x. -Documentation for the 0.9.x track is available at +Documentation for the 0.9.x track is available at `the Flume 0.9.x User Guide `_. This documentation applies to the 1.4.x track. -New and existing users are encouraged to use the 1.x releases so as to -leverage the performance improvements and configuration flexibilities available +New and existing users are encouraged to use the 1.x releases so as to +leverage the performance improvements and configuration flexibilities available in the latest architecture. System Requirements ------------------- -#. Java Runtime Environment - Java 1.6 or later (Java 1.7 Recommended) +#. Java Runtime Environment - Java 1.7 or later #. Memory - Sufficient memory for configurations used by sources, channels or sinks #. Disk Space - Sufficient disk space for configurations used by channels or sinks #. Directory Permissions - Read/Write permissions for directories used by agent @@ -234,6 +234,36 @@ The original Flume terminal will output Congratulations - you've successfully configured and deployed a Flume agent! Subsequent sections cover agent configuration in much more detail. +Logging raw data +~~~~~~~~~~~~~~~~ + + +Logging the raw stream of data flowing through the ingest pipeline is not desired behaviour in +many production environments because this may result in leaking sensitive data or security related +configurations, such as secret keys, to Flume log files. +By default, Flume will not log such information. On the other hand, if the data pipeline is broken, +Flume will attempt to provide clues for debugging the problem. + +One way to debug problems with event pipelines is to set up an additional `Memory Channel`_ +connected to a `Logger Sink`_, which will output all event data to the Flume logs. +In some situations, however, this approach is insufficient. + +In order to enable logging of event- and configuration-related data, some Java system properties +must be set in addition to log4j properties. + +To enable configuration-related logging, set the Java system property +``-Dorg.apache.flume.log.printconfig=true``. This can either be passed on the command line or by +setting this in the ``JAVA_OPTS`` variable in *flume-env.sh*. + +To enable data logging, set the Java system property ``-Dorg.apache.flume.log.rawdata=true`` +in the same way described above. For most components, the log4j logging level must also be set to +DEBUG or TRACE to make event-specific logging appear in the Flume logs. + +Here is an example of enabling both configuration logging and raw data logging while also +setting the Log4j loglevel to DEBUG for console output:: + + $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true + Zookeeper based Configuration ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -900,7 +930,7 @@ Property Name Default **channels** -- **type** -- The component type name, needs to be ``jms`` **initialContextFactory** -- Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory -**connectionFactory** -- The JNDI name the connection factory shoulld appear as +**connectionFactory** -- The JNDI name the connection factory should appear as **providerURL** -- The JMS provider URL **destinationName** -- Destination name **destinationType** -- Destination type (queue or topic) @@ -976,48 +1006,60 @@ Despite the reliability guarantees of th cases in which events may be duplicated if certain downstream failures occur. This is consistent with the guarantees offered by other Flume components. -==================== ============== ========================================================== -Property Name Default Description -==================== ============== ========================================================== -**channels** -- -**type** -- The component type name, needs to be ``spooldir``. -**spoolDir** -- The directory from which to read files from. -fileSuffix .COMPLETED Suffix to append to completely ingested files -deletePolicy never When to delete completed files: ``never`` or ``immediate`` -fileHeader false Whether to add a header storing the absolute path filename. -fileHeaderKey file Header key to use when appending absolute path filename to event header. -basenameHeader false Whether to add a header storing the basename of the file. -basenameHeaderKey basename Header Key to use when appending basename of file to event header. -ignorePattern ^$ Regular expression specifying which files to ignore (skip) -trackerDir .flumespool Directory to store metadata related to processing of files. - If this path is not an absolute path, then it is interpreted as relative to the spoolDir. -consumeOrder oldest In which order files in the spooling directory will be consumed ``oldest``, - ``youngest`` and ``random``. In case of ``oldest`` and ``youngest``, the last modified - time of the files will be used to compare the files. In case of a tie, the file - with smallest laxicographical order will be consumed first. In case of ``random`` any - file will be picked randomly. When using ``oldest`` and ``youngest`` the whole - directory will be scanned to pick the oldest/youngest file, which might be slow if there - are a large number of files, while using ``random`` may cause old files to be consumed - very late if new files keep coming in the spooling directory. -maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter. -batchSize 100 Granularity at which to batch transfer to the channel -inputCharset UTF-8 Character set used by deserializers that treat the input file as text. -decodeErrorPolicy ``FAIL`` What to do when we see a non-decodable character in the input file. - ``FAIL``: Throw an exception and fail to parse the file. - ``REPLACE``: Replace the unparseable character with the "replacement character" char, - typically Unicode U+FFFD. - ``IGNORE``: Drop the unparseable character sequence. -deserializer ``LINE`` Specify the deserializer used to parse the file into events. - Defaults to parsing each line as an event. The class specified must implement - ``EventDeserializer.Builder``. -deserializer.* Varies per event deserializer. -bufferMaxLines -- (Obselete) This option is now ignored. -bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. -selector.type replicating replicating or multiplexing -selector.* Depends on the selector.type value -interceptors -- Space-separated list of interceptors +======================== ============== ========================================================== +Property Name Default Description +======================== ============== ========================================================== +**channels** -- +**type** -- The component type name, needs to be ``spooldir``. +**spoolDir** -- The directory from which to read files from. +fileSuffix .COMPLETED Suffix to append to completely ingested files +deletePolicy never When to delete completed files: ``never`` or ``immediate`` +fileHeader false Whether to add a header storing the absolute path filename. +fileHeaderKey file Header key to use when appending absolute path filename to event header. +basenameHeader false Whether to add a header storing the basename of the file. +basenameHeaderKey basename Header Key to use when appending basename of file to event header. +includePattern ^.*$ Regular expression specifying which files to include. + It can used together with ``ignorePattern``. + If a file matches both ``ignorePattern`` and ``includePattern`` regex, + the file is ignored. +ignorePattern ^$ Regular expression specifying which files to ignore (skip). + It can used together with ``includePattern``. + If a file matches both ``ignorePattern`` and ``includePattern`` regex, + the file is ignored. +trackerDir .flumespool Directory to store metadata related to processing of files. + If this path is not an absolute path, then it is interpreted as relative to the spoolDir. +consumeOrder oldest In which order files in the spooling directory will be consumed ``oldest``, + ``youngest`` and ``random``. In case of ``oldest`` and ``youngest``, the last modified + time of the files will be used to compare the files. In case of a tie, the file + with smallest lexicographical order will be consumed first. In case of ``random`` any + file will be picked randomly. When using ``oldest`` and ``youngest`` the whole + directory will be scanned to pick the oldest/youngest file, which might be slow if there + are a large number of files, while using ``random`` may cause old files to be consumed + very late if new files keep coming in the spooling directory. +pollDelay 500 Delay (in milliseconds) used when polling for new files. +recursiveDirectorySearch false Whether to monitor sub directories for new files to read. +maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to + write to the channel(s) if the channel is full. The source will start at + a low backoff and increase it exponentially each time the channel throws a + ChannelException, upto the value specified by this parameter. +batchSize 100 Granularity at which to batch transfer to the channel +inputCharset UTF-8 Character set used by deserializers that treat the input file as text. +decodeErrorPolicy ``FAIL`` What to do when we see a non-decodable character in the input file. + ``FAIL``: Throw an exception and fail to parse the file. + ``REPLACE``: Replace the unparseable character with the "replacement character" char, + typically Unicode U+FFFD. + ``IGNORE``: Drop the unparseable character sequence. +deserializer ``LINE`` Specify the deserializer used to parse the file into events. + Defaults to parsing each line as an event. The class specified must implement + ``EventDeserializer.Builder``. +deserializer.* Varies per event deserializer. +bufferMaxLines -- (Obselete) This option is now ignored. +bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. +selector.type replicating replicating or multiplexing +selector.* Depends on the selector.type value +interceptors -- Space-separated list of interceptors interceptors.* -==================== ============== ========================================================== +======================== ============== ========================================================== Example for an agent named agent-1: @@ -1090,16 +1132,76 @@ Property Name Default deserializer.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request ========================== ================== ======================================================================= +Taildir Source +~~~~~~~~~~~~~~~~~~~~~~~~~ +.. note:: **This source is provided as a preview feature. It does not work on Windows.** + +Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. +If the new lines are being written, this source will retry reading them in wait for the completion of the write. + +This source is reliable and will not miss data even when the tailing files rotate. +It periodically writes the last read position of each files on the given position file in JSON format. +If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file. + +In other use case, this source can also start tailing from the arbitrary position for each files using the given position file. +When there is no position file on the specified path, it will start tailing from the first line of each files by default. + +Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first. + +This source does not rename or delete or do any modifications to the file being tailed. +Currently this source does not support tailing binary files. It reads text files line by line. + +=================================== ============================== =================================================== +Property Name Default Description +=================================== ============================== =================================================== +**channels** -- +**type** -- The component type name, needs to be ``TAILDIR``. +**filegroups** -- Space-separated list of file groups. Each file group indicates a set of files to be tailed. +**filegroups.** -- Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only. +positionFile ~/.flume/taildir_position.json File in JSON format to record the inode, the absolute path and the last position of each tailing file. +headers.. -- Header value which is the set with header key. Multiple headers can be specified for one file group. +byteOffsetHeader false Whether to add the byte offset of a tailed line to a header called 'byteoffset'. +skipToEnd false Whether to skip the position to EOF in the case of files not written on the position file. +idleTimeout 120000 Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it. +writePosInterval 3000 Interval time (ms) to write the last position of each file on the position file. +batchSize 100 Max number of lines to read and send to the channel at a time. Using the default is usually fine. +backoffSleepIncrement 1000 The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data. +maxBackoffSleep 5000 The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data. +cachePatternMatching true Listing directories and applying the filename regex pattern may be time consuming for directories + containing thousands of files. Caching the list of matching files can improve performance. + The order in which files are consumed will also be cached. + Requires that the file system keeps track of modification times with at least a 1-second granularity. +fileHeader false Whether to add a header storing the absolute path filename. +fileHeaderKey file Header key to use when appending absolute path filename to event header. +=================================== ============================== =================================================== + +Example for agent named a1: + +.. code-block:: properties + + a1.sources = r1 + a1.channels = c1 + a1.sources.r1.type = TAILDIR + a1.sources.r1.channels = c1 + a1.sources.r1.positionFile = /var/log/flume/taildir_position.json + a1.sources.r1.filegroups = f1 f2 + a1.sources.r1.filegroups.f1 = /var/log/test1/example.log + a1.sources.r1.headers.f1.headerKey1 = value1 + a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.* + a1.sources.r1.headers.f2.headerKey1 = value2 + a1.sources.r1.headers.f2.headerKey2 = value2-2 + a1.sources.r1.fileHeader = true + Twitter 1% firehose Source (experimental) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. warning:: - This source is hightly experimental and may change between minor versions of Flume. + This source is highly experimental and may change between minor versions of Flume. Use at your own risk. Experimental source that connects via Streaming API to the 1% sample twitter firehose, continously downloads tweets, converts them to Avro format and -sends Avro events to a downstream Flume sink. Requires the consumer and +sends Avro events to a downstream Flume sink. Requires the consumer and access tokens and secrets of a Twitter developer account. Required properties are in **bold**. @@ -1111,7 +1213,7 @@ Property Name Default Desc **consumerKey** -- OAuth consumer key **consumerSecret** -- OAuth consumer secret **accessToken** -- OAuth access token -**accessTokenSecret** -- OAuth toekn secret +**accessTokenSecret** -- OAuth token secret maxBatchSize 1000 Maximum number of twitter messages to put in a single batch maxBatchDurationMillis 1000 Maximum number of milliseconds to wait before closing a batch ====================== =========== =================================================== @@ -1134,57 +1236,224 @@ Example for agent named a1: Kafka Source ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic. +Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics. If you have multiple Kafka sources running, you can configure them with the same Consumer Group -so each will read a unique set of partitions for the topic. +so each will read a unique set of partitions for the topics. +================================== =========== =================================================== +Property Name Default Description +================================== =========== =================================================== +**channels** -- +**type** -- The component type name, needs to be ``org.apache.flume.source.kafka.KafkaSource`` +**kafka.bootstrap.servers** -- List of brokers in the Kafka cluster used by the source +kafka.consumer.group.id flume Unique identified of consumer group. Setting the same id in multiple sources or agents + indicates that they are part of the same consumer group +**kafka.topics** -- Comma-separated list of topics the kafka consumer will read messages from. +**kafka.topics.regex** -- Regex that defines set of topics the source is subscribed on. This property has higher priority + than ``kafka.topics`` and overrides ``kafka.topics`` if exists. +batchSize 1000 Maximum number of messages written to Channel in one batch +batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to Channel + The batch will be written whenever the first of size and time will be reached. +backoffSleepIncrement 1000 Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty. + Wait period will reduce aggressive pinging of an empty Kafka Topic. One second is ideal for + ingestion use cases but a lower value may be required for low latency operations with + interceptors. +maxBackoffSleep 5000 Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds is + ideal for ingestion use cases but a lower value may be required for low latency operations + with interceptors. +useFlumeEventFormat false By default events are taken as bytes from the Kafka topic directly into the event body. Set to + true to read events as the Flume Avro binary format. Used in conjunction with the same property + on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve + any Flume headers sent on the producing side. +migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. + This should be true to support seamless Kafka client migration from older versions of Flume. + Once migrated this can be set to false, though that should generally not be required. + If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset + defines how offsets are handled. + Check `Kafka documentation `_ for details +kafka.consumer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. +*more consumer security props* If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security `_ for additional + properties that need to be set on consumer. +Other Kafka Consumer Properties -- These properties are used to configure the Kafka Consumer. Any consumer property supported + by Kafka can be used. The only requirement is to prepend the property name with the prefix + ``kafka.consumer``. + For example: ``kafka.consumer.auto.offset.reset`` +================================== =========== =================================================== +.. note:: The Kafka Source overrides two Kafka consumer parameters: + auto.commit.enable is set to "false" by the source and every batch is committed. Kafka source guarantees at least once + strategy of messages retrieval. The duplicates can be present when the source starts. + The Kafka Source also provides defaults for the key.deserializer(org.apache.kafka.common.serialization.StringSerializer) + and value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended. -=============================== =========== =================================================== -Property Name Default Description -=============================== =========== =================================================== -**channels** -- -**type** -- The component type name, needs to be ``org.apache.flume.source.kafka,KafkaSource`` -**zookeeperConnect** -- URI of ZooKeeper used by Kafka cluster -**groupId** flume Unique identified of consumer group. Setting the same id in multiple sources or agents - indicates that they are part of the same consumer group -**topic** -- Kafka topic we'll read messages from. At the time, this is a single topic only. -batchSize 1000 Maximum number of messages written to Channel in one batch -batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to Channel - The batch will be written whenever the first of size and time will be reached. -backoffSleepIncrement 1000 Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty. - Wait period will reduce aggressive pinging of an empty Kafka Topic. One second is ideal for - ingestion use cases but a lower value may be required for low latency operations with - interceptors. -maxBackoffSleep 5000 Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds is - ideal for ingestion use cases but a lower value may be required for low latency operations - with interceptors. -Other Kafka Consumer Properties -- These properties are used to configure the Kafka Consumer. Any producer property supported - by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``. - For example: kafka.consumer.timeout.ms - Check `Kafka documentation ` for details -=============================== =========== =================================================== +Deprecated Properties -.. note:: The Kafka Source overrides two Kafka consumer parameters: - auto.commit.enable is set to "false" by the source and we commit every batch. For improved performance - this can be set to "true", however, this can lead to loss of data - consumer.timeout.ms is set to 10ms, so when we check Kafka for new data we wait at most 10ms for the data to arrive - setting this to a higher value can reduce CPU utilization (we'll poll Kafka in less of a tight loop), but also means - higher latency in writing batches to channel (since we'll wait longer for data to arrive). +=============================== =================== ============================================================================================= +Property Name Default Description +=============================== =================== ============================================================================================= +topic -- Use kafka.topics +groupId flume Use kafka.consumer.group.id +zookeeperConnect -- Is no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers + to establish connection with kafka cluster +=============================== =================== ============================================================================================= +Example for topic subscription by comma-separated topic list. -Example for agent named tier1: +.. code-block:: properties + + tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource + tier1.sources.source1.channels = channel1 + tier1.sources.source1.batchSize = 5000 + tier1.sources.source1.batchDurationMillis = 2000 + tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 + tier1.sources.source1.kafka.topics = test1, test2 + tier1.sources.source1.kafka.consumer.group.id = custom.g.id + +Example for topic subscription by regex .. code-block:: properties tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 - tier1.sources.source1.zookeeperConnect = localhost:2181 - tier1.sources.source1.topic = test1 - tier1.sources.source1.groupId = flume - tier1.sources.source1.kafka.consumer.timeout.ms = 100 + tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 + tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$ + # the default kafka.consumer.group.id=flume is used + + +**Security and Kafka Source:** + +Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka. +For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0. + +As of now data encryption is solely provided by SSL/TLS. + +Setting ``kafka.consumer.security.protocol`` to any of the following value means: + +- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data encryption +- **SASL_SSL** - Kerberos or plaintext authentication with data encryption +- **SSL** - TLS based encryption with optional authentication. + +.. warning:: + There is a performance degradation when SSL is enabled, + the magnitude of which depends on the CPU type and the JVM implementation. + Reference: `Kafka security overview `_ + and the jira for tracking this issue: + `KAFKA-2561 `_ + + +**TLS and Kafka Source:** + +Please read the steps described in `Configuring Kafka Clients SSL `_ +to learn about additional configuration settings for fine tuning for example any of the following: +security provider, cipher suites, enabled protocols, truststore or keystore types. + +Example configuration with server side authentication and data encryption. + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.consumer.group.id = flume-consumer + a1.channels.channel1.kafka.consumer.security.protocol = SSL + a1.channels.channel1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks + a1.channels.channel1.kafka.consumer.ssl.truststore.password= + + +Note: By default the property ``ssl.endpoint.identification.algorithm`` +is not defined, so hostname verification is not performed. +In order to enable hostname verification, set the following properties + +.. code-block:: properties + + a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS + +Once enabled, clients will verify the server's fully qualified domain name (FQDN) +against one of the following two fields: + +#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3 +#) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6 + +If client side authentication is also required then additionally the following should be added to Flume agent configuration. +Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either +individually or by their signature chain. Common example is to sign each client certificate by a single Root CA +which in turn is trusted by Kafka brokers. + +.. code-block:: properties + + a1.channels.channel1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks + a1.channels.channel1.kafka.consumer.ssl.keystore.password= + +If keystore and key use different password protection then ``ssl.key.password`` property will +provide the required additional secret for both consumer keystores: + +.. code-block:: properties + + a1.channels.channel1.kafka.consumer.ssl.key.password= + + +**Kerberos and Kafka Source:** + +To use Kafka source with a Kafka cluster secured with Kerberos, set the ``consumer.security.protocol`` properties noted above for consumer. +The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file's "KafkaClient" section. "Client" section describes the Zookeeper connection if needed. +See `Kafka doc `_ +for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh: + +.. code-block:: properties + + JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf" + JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf" +Example secure configuration using SASL_PLAINTEXT: +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.consumer.group.id = flume-consumer + a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT + a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI + a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka + +Example secure configuration using SASL_SSL: + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.consumer.group.id = flume-consumer + a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL + a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI + a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka + a1.channels.channel1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks + a1.channels.channel1.kafka.consumer.ssl.truststore.password= + + +Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) +in Kafka documentation of `SASL configuration `_. +Since the Kafka Source may also connect to Zookeeper for offset migration, the "Client" section was also added to this example. +This won't be needed unless you require offset migration, or you require this section for other secure components. +Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files. + +.. code-block:: javascript + + Client { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/path/to/keytabs/flume.keytab" + principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; + }; + + KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/path/to/keytabs/flume.keytab" + principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; + }; NetCat Source @@ -1221,27 +1490,28 @@ Example for agent named a1: a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 - a1.sources.r1.bind = 6666 + a1.sources.r1.port = 6666 a1.sources.r1.channels = c1 Sequence Generator Source ~~~~~~~~~~~~~~~~~~~~~~~~~ -A simple sequence generator that continuously generates events with a counter -that starts from 0 and increments by 1. Useful mainly for testing. -Required properties are in **bold**. - -============== =========== ======================================== -Property Name Default Description -============== =========== ======================================== +A simple sequence generator that continuously generates events with a counter that starts from 0, +increments by 1 and stops at totalEvents. Retries when it can't send events to the channel. Useful +mainly for testing. Required properties are in **bold**. + +============== =============== ======================================== +Property Name Default Description +============== =============== ======================================== **channels** -- -**type** -- The component type name, needs to be ``seq`` -selector.type replicating or multiplexing -selector.* replicating Depends on the selector.type value -interceptors -- Space-separated list of interceptors +**type** -- The component type name, needs to be ``seq`` +selector.type replicating or multiplexing +selector.* replicating Depends on the selector.type value +interceptors -- Space-separated list of interceptors interceptors.* batchSize 1 -============== =========== ======================================== +totalEvents Long.MAX_VALUE Number of unique events sent by the source. +============== =============== ======================================== Example for agent named a1: @@ -1660,34 +1930,39 @@ required. The following are the escape sequences supported: -========= ================================================= -Alias Description -========= ================================================= -%{host} Substitute value of event header named "host". Arbitrary header names are supported. -%t Unix time in milliseconds -%a locale's short weekday name (Mon, Tue, ...) -%A locale's full weekday name (Monday, Tuesday, ...) -%b locale's short month name (Jan, Feb, ...) -%B locale's long month name (January, February, ...) -%c locale's date and time (Thu Mar 3 23:05:25 2005) -%d day of month (01) -%e day of month without padding (1) -%D date; same as %m/%d/%y -%H hour (00..23) -%I hour (01..12) -%j day of year (001..366) -%k hour ( 0..23) -%m month (01..12) -%n month without padding (1..12) -%M minute (00..59) -%p locale's equivalent of am or pm -%s seconds since 1970-01-01 00:00:00 UTC -%S second (00..59) -%y last two digits of year (00..99) -%Y year (2010) -%z +hhmm numeric timezone (for example, -0400) -========= ================================================= +=============== ================================================= +Alias Description +=============== ================================================= +%{host} Substitute value of event header named "host". Arbitrary header names are supported. +%t Unix time in milliseconds +%a locale's short weekday name (Mon, Tue, ...) +%A locale's full weekday name (Monday, Tuesday, ...) +%b locale's short month name (Jan, Feb, ...) +%B locale's long month name (January, February, ...) +%c locale's date and time (Thu Mar 3 23:05:25 2005) +%d day of month (01) +%e day of month without padding (1) +%D date; same as %m/%d/%y +%H hour (00..23) +%I hour (01..12) +%j day of year (001..366) +%k hour ( 0..23) +%m month (01..12) +%n month without padding (1..12) +%M minute (00..59) +%p locale's equivalent of am or pm +%s seconds since 1970-01-01 00:00:00 UTC +%S second (00..59) +%y last two digits of year (00..99) +%Y year (2010) +%z +hhmm numeric timezone (for example, -0400) +%[localhost] Substitute the hostname of the host where the agent is running +%[IP] Substitute the IP address of the host where the agent is running +%[FQDN] Substitute the canonical hostname of the host where the agent is running +=============== ================================================= +Note: The escape strings %[localhost], %[IP] and %[FQDN] all rely on Java's ability to obtain the +hostname, which may fail in some networking environments. The file in use will have the name mangled to include ".tmp" at the end. Once the file is closed, this extension is removed. This allows excluding partially @@ -1773,8 +2048,7 @@ This sink streams events containing deli Events are written using Hive transactions. As soon as a set of events are committed to Hive, they become immediately visible to Hive queries. Partitions to which flume will stream to can either be pre-created or, optionally, Flume can create them if they are missing. Fields from incoming event data are mapped to -corresponding columns in the Hive table. **This sink is provided as a preview feature and not recommended -for use in production.** +corresponding columns in the Hive table. ====================== ============ ====================================================================== Name Default Description @@ -1912,8 +2186,9 @@ accept tab separated input containing th Logger Sink ~~~~~~~~~~~ -Logs event at INFO level. Typically useful for testing/debugging purpose. -Required properties are in **bold**. +Logs event at INFO level. Typically useful for testing/debugging purpose. Required properties are +in **bold**. This sink is the only exception which doesn't require the extra configuration +explained in the `Logging raw data`_ section. ============== ======= =========================================== Property Name Default Description @@ -2065,16 +2340,19 @@ File Roll Sink Stores events on the local filesystem. Required properties are in **bold**. -=================== ======= ====================================================================================================================== -Property Name Default Description -=================== ======= ====================================================================================================================== -**channel** -- -**type** -- The component type name, needs to be ``file_roll``. -**sink.directory** -- The directory where files will be stored -sink.rollInterval 30 Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file. -sink.serializer TEXT Other possible options include ``avro_event`` or the FQCN of an implementation of EventSerializer.Builder interface. -batchSize 100 -=================== ======= ====================================================================================================================== +========================== ======= ====================================================================================================================== +Property Name Default Description +========================== ======= ====================================================================================================================== +**channel** -- +**type** -- The component type name, needs to be ``file_roll``. +**sink.directory** -- The directory where files will be stored +sink.pathManager DEFAULT The PathManager implementation to use. +sink.pathManager.extension -- The file extension if the default PathManager is used. +sink.pathManager.prefix -- A character string to add to the beginning of the file name if the default PathManager is used +sink.rollInterval 30 Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file. +sink.serializer TEXT Other possible options include ``avro_event`` or the FQCN of an implementation of EventSerializer.Builder interface. +batchSize 100 +========================== ======= ====================================================================================================================== Example for agent named a1: @@ -2230,19 +2508,19 @@ This sink extracts data from Flume event This sink is well suited for use cases that stream raw data into HDFS (via the HdfsSink) and simultaneously extract, transform and load the same data into Solr (via MorphlineSolrSink). In particular, this sink can process arbitrary heterogeneous raw data from disparate data sources and turn it into a data model that is useful to Search applications. -The ETL functionality is customizable using a `morphline configuration file `_ that defines a chain of transformation commands that pipe event records from one command to another. +The ETL functionality is customizable using a `morphline configuration file `_ that defines a chain of transformation commands that pipe event records from one command to another. Morphlines can be seen as an evolution of Unix pipelines where the data model is generalized to work with streams of generic records, including arbitrary binary payloads. A morphline command is a bit like a Flume Interceptor. Morphlines can be embedded into Hadoop components such as Flume. Commands to parse and transform a set of standard data formats such as log files, Avro, CSV, Text, HTML, XML, PDF, Word, Excel, etc. are provided out of the box, and additional custom commands and parsers for additional data formats can be added as morphline plugins. Any kind of data format can be indexed and any Solr documents for any kind of Solr schema can be generated, and any custom ETL logic can be registered and executed. -Morphlines manipulate continuous streams of records. The data model can be described as follows: A record is a set of named fields where each field has an ordered list of one or more values. A value can be any Java Object. That is, a record is essentially a hash table where each hash table entry contains a String key and a list of Java Objects as values. (The implementation uses Guava's ``ArrayListMultimap``, which is a ``ListMultimap``). Note that a field can have multiple values and any two records need not use common field names. +Morphlines manipulate continuous streams of records. The data model can be described as follows: A record is a set of named fields where each field has an ordered list of one or more values. A value can be any Java Object. That is, a record is essentially a hash table where each hash table entry contains a String key and a list of Java Objects as values. (The implementation uses Guava's ``ArrayListMultimap``, which is a ``ListMultimap``). Note that a field can have multiple values and any two records need not use common field names. This sink fills the body of the Flume event into the ``_attachment_body`` field of the morphline record, as well as copies the headers of the Flume event into record fields of the same name. The commands can then act on this data. Routing to a SolrCloud cluster is supported to improve scalability. Indexing load can be spread across a large number of MorphlineSolrSinks for improved scalability. Indexing load can be replicated across multiple MorphlineSolrSinks for high availability, for example using Flume features such as Load balancing Sink Processor. MorphlineInterceptor can also help to implement dynamic routing to multiple Solr collections (e.g. for multi-tenancy). -The morphline and solr jars required for your environment must be placed in the lib directory of the Apache Flume installation. +The morphline and solr jars required for your environment must be placed in the lib directory of the Apache Flume installation. The type is the FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink @@ -2280,11 +2558,11 @@ ElasticSearchSink ~~~~~~~~~~~~~~~~~ This sink writes data to an elasticsearch cluster. By default, events will be written so that the `Kibana `_ graphical interface -can display them - just as if `logstash `_ wrote them. +can display them - just as if `logstash `_ wrote them. -The elasticsearch and lucene-core jars required for your environment must be placed in the lib directory of the Apache Flume installation. +The elasticsearch and lucene-core jars required for your environment must be placed in the lib directory of the Apache Flume installation. Elasticsearch requires that the major version of the client JAR match that of the server and that both are running the same minor version -of the JVM. SerializationExceptions will appear if this is incorrect. To +of the JVM. SerializationExceptions will appear if this is incorrect. To select the required version first determine the version of elasticsearch and the JVM version the target cluster is running. Then select an elasticsearch client library which matches the major version. A 0.19.x client can talk to a 0.19.x cluster; 0.20.x can talk to 0.20.x and 0.90.x can talk to 0.90.x. Once the elasticsearch version has been determined then read the pom.xml file to determine the correct lucene-core JAR version to use. The Flume agent @@ -2416,52 +2694,212 @@ Kafka Sink This is a Flume Sink implementation that can publish data to a `Kafka `_ topic. One of the objective is to integrate Flume with Kafka so that pull based processing systems can process the data coming -through various Flume sources. This currently supports Kafka 0.8.x series of releases. +through various Flume sources. This currently supports Kafka 0.9.x series of releases. + +This version of Flume no longer supports Older Versions (0.8.x) of Kafka. Required properties are marked in bold font. -=============================== =================== ============================================================================================= -Property Name Default Description -=============================== =================== ============================================================================================= -**type** -- Must be set to ``org.apache.flume.sink.kafka.KafkaSink`` -**brokerList** -- List of brokers Kafka-Sink will connect to, to get the list of topic partitions - This can be a partial list of brokers, but we recommend at least two for HA. - The format is comma separated list of hostname:port -topic default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, - messages will be published to this topic. - If the event header contains a "topic" field, the event will be published to that topic - overriding the topic configured here. -batchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency. -requiredAcks 1 How many replicas must acknowledge a message before its considered successfully written. - Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) - Set this to -1 to avoid data loss in some cases of leader failure. -Other Kafka Producer Properties -- These properties are used to configure the Kafka Producer. Any producer property supported - by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``. - For example: kafka.producer.type -=============================== =================== ============================================================================================= +================================== =================== ============================================================================================= +Property Name Default Description +================================== =================== ============================================================================================= +**type** -- Must be set to ``org.apache.flume.sink.kafka.KafkaSink`` +**kafka.bootstrap.servers** -- List of brokers Kafka-Sink will connect to, to get the list of topic partitions + This can be a partial list of brokers, but we recommend at least two for HA. + The format is comma separated list of hostname:port +kafka.topic default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, + messages will be published to this topic. + If the event header contains a "topic" field, the event will be published to that topic + overriding the topic configured here. +flumeBatchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency. +kafka.producer.acks 1 How many replicas must acknowledge a message before its considered successfully written. + Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) + Set this to -1 to avoid data loss in some cases of leader failure. +useFlumeEventFormat false By default events are put as bytes onto the Kafka topic directly from the event body. Set to + true to store events as the Flume Avro binary format. Used in conjunction with the same property + on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve + any Flume headers for the producing side. +defaultPartitionId -- Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless + overriden by ``partitionIdHeader``. By default, if this property is not set, events will be + distributed by the Kafka Producer's partitioner - including by ``key`` if specified (or by a + partitioner specified by ``kafka.partitioner.class``). +partitionIdHeader -- When set, the sink will take the value of the field named using the value of this property + from the event header and send the message to the specified partition of the topic. If the + value represents an invalid partition, an EventDeliveryException will be thrown. If the header value + is present then this setting overrides ``defaultPartitionId``. +kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. +*more producer security props* If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security `_ for additional + properties that need to be set on producer. +Other Kafka Producer Properties -- These properties are used to configure the Kafka Producer. Any producer property supported + by Kafka can be used. The only requirement is to prepend the property name with the prefix + ``kafka.producer``. + For example: kafka.producer.linger.ms +================================== =================== ============================================================================================= .. note:: Kafka Sink uses the ``topic`` and ``key`` properties from the FlumeEvent headers to send events to Kafka. If ``topic`` exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink. If ``key`` exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key will be sent to the same partition. If the key is null, events will be sent to random partitions. +The Kafka sink also provides defaults for the key.serializer(org.apache.kafka.common.serialization.StringSerializer) +and value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended. + +Deprecated Properties + +=============================== =================== ============================================================================================= +Property Name Default Description +=============================== =================== ============================================================================================= +brokerList -- Use kafka.bootstrap.servers +topic default-flume-topic Use kafka.topic +batchSize 100 Use kafka.flumeBatchSize +requiredAcks 1 Use kafka.producer.acks + +=============================== =================== ============================================================================================= + An example configuration of a Kafka sink is given below. Properties starting -with the prefix ``kafka`` (the last 3 properties) are used when instantiating -the Kafka producer. The properties that are passed when creating the Kafka +with the prefix ``kafka.producer`` the Kafka producer. The properties that are passed when creating the Kafka producer are not limited to the properties given in this example. -Also it's possible include your custom properties here and access them inside +Also it is possible to include your custom properties here and access them inside the preprocessor through the Flume Context object passed in as a method argument. .. code-block:: properties - a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink - a1.sinks.k1.topic = mytopic - a1.sinks.k1.brokerList = localhost:9092 - a1.sinks.k1.requiredAcks = 1 - a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = c1 + a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink + a1.sinks.k1.kafka.topic = mytopic + a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 + a1.sinks.k1.kafka.flumeBatchSize = 20 + a1.sinks.k1.kafka.producer.acks = 1 + a1.sinks.k1.kafka.producer.linger.ms = 1 + a1.sinks.ki.kafka.producer.compression.type = snappy + + +**Security and Kafka Sink:** + +Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka. +For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0. + +As of now data encryption is solely provided by SSL/TLS. + +Setting ``kafka.producer.security.protocol`` to any of the following value means: + +- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data encryption +- **SASL_SSL** - Kerberos or plaintext authentication with data encryption +- **SSL** - TLS based encryption with optional authentication. + +.. warning:: + There is a performance degradation when SSL is enabled, + the magnitude of which depends on the CPU type and the JVM implementation. + Reference: `Kafka security overview `_ + and the jira for tracking this issue: + `KAFKA-2561 `__ + + +**TLS and Kafka Sink:** + +Please read the steps described in `Configuring Kafka Clients SSL `_ +to learn about additional configuration settings for fine tuning for example any of the following: +security provider, cipher suites, enabled protocols, truststore or keystore types. + +Example configuration with server side authentication and data encryption. + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.producer.security.protocol = SSL + a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks + a1.channels.channel1.kafka.producer.ssl.truststore.password = + + +Note: By default the property ``ssl.endpoint.identification.algorithm`` +is not defined, so hostname verification is not performed. +In order to enable hostname verification, set the following properties + +.. code-block:: properties + + a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS + +Once enabled, clients will verify the server's fully qualified domain name (FQDN) +against one of the following two fields: + +#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3 +#) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6 + +If client side authentication is also required then additionally the following should be added to Flume agent configuration. +Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either +individually or by their signature chain. Common example is to sign each client certificate by a single Root CA +which in turn is trusted by Kafka brokers. + +.. code-block:: properties + + a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks + a1.channels.channel1.kafka.producer.ssl.keystore.password = + +If keystore and key use different password protection then ``ssl.key.password`` property will +provide the required additional secret for producer keystore: + +.. code-block:: properties + + a1.channels.channel1.kafka.producer.ssl.key.password = + + +**Kerberos and Kafka Sink:** + +To use Kafka sink with a Kafka cluster secured with Kerberos, set the ``producer.security.protocol`` property noted above for producer. +The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file's "KafkaClient" section. "Client" section describes the Zookeeper connection if needed. +See `Kafka doc `_ +for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh: + +.. code-block:: properties + + JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf" + JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf" + +Example secure configuration using SASL_PLAINTEXT: + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT + a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI + a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka + + +Example secure configuration using SASL_SSL: + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL + a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI + a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka + a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks + a1.channels.channel1.kafka.producer.ssl.truststore.password = + + +Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) +in Kafka documentation of `SASL configuration `_. +Unlike the Kafka Source or Kafka Channel a "Client" section is not required, unless it is needed by other connecting components. Also please make sure +that the operating system user of the Flume processes has read privileges on the jaas and keytab files. + +.. code-block:: javascript + + KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/path/to/keytabs/flume.keytab" + principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; + }; + Custom Sink ~~~~~~~~~~~ @@ -2534,7 +2972,7 @@ Example for agent named a1: a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 - + JDBC Channel ~~~~~~~~~~~~ @@ -2579,40 +3017,82 @@ The events are stored in a Kafka cluster replication, so in case an agent or a kafka broker crashes, the events are immediately available to other sinks The Kafka channel can be used for multiple scenarios: -* With Flume source and sink - it provides a reliable and highly available channel for events -* With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps -* With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sources such as HDFS, HBase or Solr + +#. With Flume source and sink - it provides a reliable and highly available channel for events +#. With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps +#. With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr + + +This version of Flume requires Kafka version 0.9 or greater due to the reliance on the Kafka clients shipped with that version. The configuration of +the channel has changed compared to previous flume versions. + +The configuration parameters are organized as such: + +#. Configuration values related to the channel generically are applied at the channel config level, eg: a1.channel.k1.type = +#. Configuration values related to Kafka or how the Channel operates are prefixed with "kafka.", (this are analgous to CommonClient Configs) eg: a1.channels.k1.kafka.topic and a1.channels.k1.kafka.bootstrap.servers. This is not dissimilar to how the hdfs sink operates +#. Properties specific to the producer/consumer are prefixed by kafka.producer or kafka.consumer +#. Where possible, the Kafka paramter names are used, eg: bootstrap.servers and acks + +This version of flume is backwards-compatible with previous versions, however deprecated properties are indicated in the table below and a warning message +is logged on startup when they are present in the configuration file. Required properties are in **bold**. -====================== ========================== =============================================================================================================== -Property Name Default Description -====================== ========================== =============================================================================================================== -**type** -- The component type name, needs to be ``org.apache.flume.channel.kafka.KafkaChannel`` -**brokerList** -- List of brokers in the Kafka cluster used by the channel - This can be a partial list of brokers, but we recommend at least two for HA. - The format is comma separated list of hostname:port -**zookeeperConnect** -- URI of ZooKeeper used by Kafka cluster - The format is comma separated list of hostname:port. If chroot is used, it is added once at the end. - For example: zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2181/kafka -topic flume-channel Kafka topic which the channel will use -groupId flume Consumer group ID the channel uses to register with Kafka. - Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data - Note that having non-channel consumers with the same ID can lead to data loss. -parseAsFlumeEvent true Expecting Avro datums with FlumeEvent schema in the channel. - This should be true if Flume source is writing to the channel - And false if other producers are writing into the topic that the channel is using - Flume source messages to Kafka can be parsed outside of Flume by using - org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact -readSmallestOffset false When set to true, the channel will read all data in the topic, starting from the oldest event - when false, it will read only events written after the channel started - When "parseAsFlumeEvent" is true, this will be false. Flume source will start prior to the sinks and this - guarantees that events sent by source before sinks start will not be lost. -Other Kafka Properties -- These properties are used to configure the Kafka Producer and Consumer used by the channel. - Any property supported by Kafka can be used. - The only requirement is to prepend the property name with the prefix ``kafka.``. - For example: kafka.producer.type -====================== ========================== =============================================================================================================== +======================================= ========================== =============================================================================================================== +Property Name Default Description +======================================= ========================== =============================================================================================================== +**type** -- The component type name, needs to be ``org.apache.flume.channel.kafka.KafkaChannel`` +**kafka.bootstrap.servers** -- List of brokers in the Kafka cluster used by the channel + This can be a partial list of brokers, but we recommend at least two for HA. + The format is comma separated list of hostname:port +kafka.topic flume-channel Kafka topic which the channel will use +kafka.consumer.group.id flume Consumer group ID the channel uses to register with Kafka. + Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data + Note that having non-channel consumers with the same ID can lead to data loss. + +parseAsFlumeEvent true Expecting Avro datums with FlumeEvent schema in the channel. + This should be true if Flume source is writing to the channel and false if other producers are + writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using + org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact +migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. + This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set + to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset + configuration defines how offsets are handled. +pollTimeout 500 The amount of time(in milliseconds) to wait in the "poll()" call of the consumer. + https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) +defaultPartitionId -- Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless + overriden by ``partitionIdHeader``. By default, if this property is not set, events will be + distributed by the Kafka Producer's partitioner - including by ``key`` if specified (or by a + partitioner specified by ``kafka.partitioner.class``). +partitionIdHeader -- When set, the producer will take the value of the field named using the value of this property + from the event header and send the message to the specified partition of the topic. If the + value represents an invalid partition the event will not be accepted into the channel. If the header value + is present then this setting overrides ``defaultPartitionId``. +kafka.consumer.auto.offset.reset latest What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server + (e.g. because that data has been deleted): + earliest: automatically reset the offset to the earliest offset + latest: automatically reset the offset to the latest offset + none: throw exception to the consumer if no previous offset is found for the consumer\'s group + anything else: throw exception to the consumer. +kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. +kafka.consumer.security.protocol PLAINTEXT Same as kafka.producer.security.protocol but for reading/consuming from Kafka. +*more producer/consumer security props* If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security `_ for additional + properties that need to be set on producer/consumer. +======================================= ========================== =============================================================================================================== + +Deprecated Properties + +================================ ========================== =============================================================================================================== +Property Name Default Description +================================ ========================== =============================================================================================================== +brokerList -- List of brokers in the Kafka cluster used by the channel + This can be a partial list of brokers, but we recommend at least two for HA. + The format is comma separated list of hostname:port +topic flume-channel Use kafka.topic +groupId flume Use kafka.consumer.group.id +readSmallestOffset false Use kafka.consumer.auto.offset.reset + +================================ ========================== =============================================================================================================== .. note:: Due to the way the channel is load balanced, there may be duplicate events when the agent first starts up @@ -2620,12 +3100,162 @@ Example for agent named a1: .. code-block:: properties - a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel - a1.channels.channel1.capacity = 10000 - a1.channels.channel1.transactionCapacity = 1000 - a1.channels.channel1.brokerList=kafka-2:9092,kafka-3:9092 - a1.channels.channel1.topic=channel1 - a1.channels.channel1.zookeeperConnect=kafka-1:2181 + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.consumer.group.id = flume-consumer + + +**Security and Kafka Channel:** + +Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka. +For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0. + +As of now data encryption is solely provided by SSL/TLS. + +Setting ``kafka.producer|consumer.security.protocol`` to any of the following value means: + +- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data encryption +- **SASL_SSL** - Kerberos or plaintext authentication with data encryption +- **SSL** - TLS based encryption with optional authentication. + +.. warning:: + There is a performance degradation when SSL is enabled, + the magnitude of which depends on the CPU type and the JVM implementation. + Reference: `Kafka security overview `_ + and the jira for tracking this issue: + `KAFKA-2561 `_ + + +**TLS and Kafka Channel:** + +Please read the steps described in `Configuring Kafka Clients SSL `_ +to learn about additional configuration settings for fine tuning for example any of the following: +security provider, cipher suites, enabled protocols, truststore or keystore types. + +Example configuration with server side authentication and data encryption. + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.consumer.group.id = flume-consumer + a1.channels.channel1.kafka.producer.security.protocol = SSL + a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks + a1.channels.channel1.kafka.producer.ssl.truststore.password = + a1.channels.channel1.kafka.consumer.security.protocol = SSL + a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks + a1.channels.channel1.kafka.consumer.ssl.truststore.password = + + +Note: By default the property ``ssl.endpoint.identification.algorithm`` +is not defined, so hostname verification is not performed. +In order to enable hostname verification, set the following properties + +.. code-block:: properties + + a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS + a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS + +Once enabled, clients will verify the server's fully qualified domain name (FQDN) +against one of the following two fields: + +#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3 +#) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6 + +If client side authentication is also required then additionally the following should be added to Flume agent configuration. +Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either +individually or by their signature chain. Common example is to sign each client certificate by a single Root CA +which in turn is trusted by Kafka brokers. + +.. code-block:: properties + + a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks + a1.channels.channel1.kafka.producer.ssl.keystore.password = + a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks + a1.channels.channel1.kafka.consumer.ssl.keystore.password = + +If keystore and key use different password protection then ``ssl.key.password`` property will +provide the required additional secret for both consumer and producer keystores: + +.. code-block:: properties + + a1.channels.channel1.kafka.producer.ssl.key.password = + a1.channels.channel1.kafka.consumer.ssl.key.password = + + +**Kerberos and Kafka Channel:** + +To use Kafka channel with a Kafka cluster secured with Kerberos, set the ``producer/consumer.security.protocol`` properties noted above for producer and/or consumer. +The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file's "KafkaClient" section. "Client" section describes the Zookeeper connection if needed. +See `Kafka doc `_ [... 185 lines stripped ...]