flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From build...@apache.org
Subject svn commit: r999548 [2/6] - in /websites/staging/flume/trunk/content: ./ .doctrees/ .doctrees/releases/ _sources/ _sources/releases/ releases/
Date Mon, 17 Oct 2016 12:35:17 GMT
Modified: websites/staging/flume/trunk/content/FlumeUserGuide.html
==============================================================================
--- websites/staging/flume/trunk/content/FlumeUserGuide.html (original)
+++ websites/staging/flume/trunk/content/FlumeUserGuide.html Mon Oct 17 12:35:17 2016
@@ -7,7 +7,7 @@
   <head>
     <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
     
-    <title>Flume 1.6.0 User Guide &mdash; Apache Flume</title>
+    <title>Flume 1.7.0 User Guide &mdash; Apache Flume</title>
     
     <link rel="stylesheet" href="_static/flume.css" type="text/css" />
     <link rel="stylesheet" href="_static/pygments.css" type="text/css" />
@@ -26,7 +26,7 @@
     <script type="text/javascript" src="_static/doctools.js"></script>
     <link rel="top" title="Apache Flume" href="index.html" />
     <link rel="up" title="Documentation" href="documentation.html" />
-    <link rel="next" title="Flume 1.6.0 Developer Guide" href="FlumeDeveloperGuide.html" />
+    <link rel="next" title="Flume 1.7.0 Developer Guide" href="FlumeDeveloperGuide.html" />
     <link rel="prev" title="Documentation" href="documentation.html" /> 
   </head>
   <body>
@@ -59,8 +59,8 @@
         <div class="bodywrapper">
           <div class="body">
             
-  <div class="section" id="flume-1-6-0-user-guide">
-<h1>Flume 1.6.0 User Guide<a class="headerlink" href="#flume-1-6-0-user-guide" title="Permalink to this headline">¶</a></h1>
+  <div class="section" id="flume-1-7-0-user-guide">
+<h1>Flume 1.7.0 User Guide<a class="headerlink" href="#flume-1-7-0-user-guide" title="Permalink to this headline">¶</a></h1>
 <div class="section" id="introduction">
 <h2>Introduction<a class="headerlink" href="#introduction" title="Permalink to this headline">¶</a></h2>
 <div class="section" id="overview">
@@ -84,7 +84,7 @@ in the latest architecture.</p>
 <div class="section" id="system-requirements">
 <h3>System Requirements<a class="headerlink" href="#system-requirements" title="Permalink to this headline">¶</a></h3>
 <ol class="arabic simple">
-<li>Java Runtime Environment - Java 1.6 or later (Java 1.7 Recommended)</li>
+<li>Java Runtime Environment - Java 1.7 or later</li>
 <li>Memory - Sufficient memory for configurations used by sources, channels or sinks</li>
 <li>Disk Space - Sufficient disk space for configurations used by channels or sinks</li>
 <li>Directory Permissions - Read/Write permissions for directories used by agent</li>
@@ -248,6 +248,30 @@ OK</pre>
 </div>
 <p>Congratulations - you&#8217;ve successfully configured and deployed a Flume agent! Subsequent sections cover agent configuration in much more detail.</p>
 </div>
+<div class="section" id="logging-raw-data">
+<h4>Logging raw data<a class="headerlink" href="#logging-raw-data" title="Permalink to this headline">¶</a></h4>
+<p>Logging the raw stream of data flowing through the ingest pipeline is not desired behaviour in
+many production environments because this may result in leaking sensitive data or security related
+configurations, such as secret keys, to Flume log files.
+By default, Flume will not log such information. On the other hand, if the data pipeline is broken,
+Flume will attempt to provide clues for debugging the problem.</p>
+<p>One way to debug problems with event pipelines is to set up an additional <a class="reference internal" href="#memory-channel">Memory Channel</a>
+connected to a <a class="reference internal" href="#logger-sink">Logger Sink</a>, which will output all event data to the Flume logs.
+In some situations, however, this approach is insufficient.</p>
+<p>In order to enable logging of event- and configuration-related data, some Java system properties
+must be set in addition to log4j properties.</p>
+<p>To enable configuration-related logging, set the Java system property
+<tt class="docutils literal"><span class="pre">-Dorg.apache.flume.log.printconfig=true</span></tt>. This can either be passed on the command line or by
+setting this in the <tt class="docutils literal"><span class="pre">JAVA_OPTS</span></tt> variable in <em>flume-env.sh</em>.</p>
+<p>To enable data logging, set the Java system property <tt class="docutils literal"><span class="pre">-Dorg.apache.flume.log.rawdata=true</span></tt>
+in the same way described above. For most components, the log4j logging level must also be set to
+DEBUG or TRACE to make event-specific logging appear in the Flume logs.</p>
+<p>Here is an example of enabling both configuration logging and raw data logging while also
+setting the Log4j loglevel to DEBUG for console output:</p>
+<div class="highlight-none"><div class="highlight"><pre>$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
+</pre></div>
+</div>
+</div>
 <div class="section" id="zookeeper-based-configuration">
 <h4>Zookeeper based Configuration<a class="headerlink" href="#zookeeper-based-configuration" title="Permalink to this headline">¶</a></h4>
 <p>Flume supports Agent configurations via Zookeeper. <em>This is an experimental feature.</em> The configuration file needs to be uploaded
@@ -1048,7 +1072,7 @@ via FLUME_CLASSPATH variable in flume-en
 </tr>
 <tr class="row-odd"><td><strong>connectionFactory</strong></td>
 <td>&#8211;</td>
-<td>The JNDI name the connection factory shoulld appear as</td>
+<td>The JNDI name the connection factory should appear as</td>
 </tr>
 <tr class="row-even"><td><strong>providerURL</strong></td>
 <td>&#8211;</td>
@@ -1150,9 +1174,9 @@ cases in which events may be duplicated
 This is consistent with the guarantees offered by other Flume components.</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="6%" />
-<col width="4%" />
-<col width="89%" />
+<col width="18%" />
+<col width="10%" />
+<col width="72%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Property Name</th>
@@ -1197,39 +1221,60 @@ This is consistent with the guarantees o
 <td>basename</td>
 <td>Header Key to use when appending  basename of file to event header.</td>
 </tr>
-<tr class="row-odd"><td>ignorePattern</td>
+<tr class="row-odd"><td>includePattern</td>
+<td>^.*$</td>
+<td>Regular expression specifying which files to include.
+It can used together with <tt class="docutils literal"><span class="pre">ignorePattern</span></tt>.
+If a file matches both <tt class="docutils literal"><span class="pre">ignorePattern</span></tt> and <tt class="docutils literal"><span class="pre">includePattern</span></tt> regex,
+the file is ignored.</td>
+</tr>
+<tr class="row-even"><td>ignorePattern</td>
 <td>^$</td>
-<td>Regular expression specifying which files to ignore (skip)</td>
+<td>Regular expression specifying which files to ignore (skip).
+It can used together with <tt class="docutils literal"><span class="pre">includePattern</span></tt>.
+If a file matches both <tt class="docutils literal"><span class="pre">ignorePattern</span></tt> and <tt class="docutils literal"><span class="pre">includePattern</span></tt> regex,
+the file is ignored.</td>
 </tr>
-<tr class="row-even"><td>trackerDir</td>
+<tr class="row-odd"><td>trackerDir</td>
 <td>.flumespool</td>
 <td>Directory to store metadata related to processing of files.
 If this path is not an absolute path, then it is interpreted as relative to the spoolDir.</td>
 </tr>
-<tr class="row-odd"><td>consumeOrder</td>
+<tr class="row-even"><td>consumeOrder</td>
 <td>oldest</td>
 <td>In which order files in the spooling directory will be consumed <tt class="docutils literal"><span class="pre">oldest</span></tt>,
 <tt class="docutils literal"><span class="pre">youngest</span></tt> and <tt class="docutils literal"><span class="pre">random</span></tt>. In case of <tt class="docutils literal"><span class="pre">oldest</span></tt> and <tt class="docutils literal"><span class="pre">youngest</span></tt>, the last modified
 time of the files will be used to compare the files. In case of a tie, the file
-with smallest laxicographical order will be consumed first. In case of <tt class="docutils literal"><span class="pre">random</span></tt> any
+with smallest lexicographical order will be consumed first. In case of <tt class="docutils literal"><span class="pre">random</span></tt> any
 file will be picked randomly. When using <tt class="docutils literal"><span class="pre">oldest</span></tt> and <tt class="docutils literal"><span class="pre">youngest</span></tt> the whole
 directory will be scanned to pick the oldest/youngest file, which might be slow if there
 are a large number of files, while using <tt class="docutils literal"><span class="pre">random</span></tt> may cause old files to be consumed
 very late if new files keep coming in the spooling directory.</td>
 </tr>
-<tr class="row-even"><td>maxBackoff</td>
+<tr class="row-odd"><td>pollDelay</td>
+<td>500</td>
+<td>Delay (in milliseconds) used when polling for new files.</td>
+</tr>
+<tr class="row-even"><td>recursiveDirectorySearch</td>
+<td>false</td>
+<td>Whether to monitor sub directories for new files to read.</td>
+</tr>
+<tr class="row-odd"><td>maxBackoff</td>
 <td>4000</td>
-<td>The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.</td>
+<td>The maximum time (in millis) to wait between consecutive attempts to
+write to the channel(s) if the channel is full. The source will start at
+a low backoff and increase it exponentially each time the channel throws a
+ChannelException, upto the value specified by this parameter.</td>
 </tr>
-<tr class="row-odd"><td>batchSize</td>
+<tr class="row-even"><td>batchSize</td>
 <td>100</td>
 <td>Granularity at which to batch transfer to the channel</td>
 </tr>
-<tr class="row-even"><td>inputCharset</td>
+<tr class="row-odd"><td>inputCharset</td>
 <td>UTF-8</td>
 <td>Character set used by deserializers that treat the input file as text.</td>
 </tr>
-<tr class="row-odd"><td>decodeErrorPolicy</td>
+<tr class="row-even"><td>decodeErrorPolicy</td>
 <td><tt class="docutils literal"><span class="pre">FAIL</span></tt></td>
 <td>What to do when we see a non-decodable character in the input file.
 <tt class="docutils literal"><span class="pre">FAIL</span></tt>: Throw an exception and fail to parse the file.
@@ -1237,37 +1282,37 @@ very late if new files keep coming in th
 typically Unicode U+FFFD.
 <tt class="docutils literal"><span class="pre">IGNORE</span></tt>: Drop the unparseable character sequence.</td>
 </tr>
-<tr class="row-even"><td>deserializer</td>
+<tr class="row-odd"><td>deserializer</td>
 <td><tt class="docutils literal"><span class="pre">LINE</span></tt></td>
 <td>Specify the deserializer used to parse the file into events.
 Defaults to parsing each line as an event. The class specified must implement
 <tt class="docutils literal"><span class="pre">EventDeserializer.Builder</span></tt>.</td>
 </tr>
-<tr class="row-odd"><td>deserializer.*</td>
+<tr class="row-even"><td>deserializer.*</td>
 <td>&nbsp;</td>
 <td>Varies per event deserializer.</td>
 </tr>
-<tr class="row-even"><td>bufferMaxLines</td>
+<tr class="row-odd"><td>bufferMaxLines</td>
 <td>&#8211;</td>
 <td>(Obselete) This option is now ignored.</td>
 </tr>
-<tr class="row-odd"><td>bufferMaxLineLength</td>
+<tr class="row-even"><td>bufferMaxLineLength</td>
 <td>5000</td>
 <td>(Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.</td>
 </tr>
-<tr class="row-even"><td>selector.type</td>
+<tr class="row-odd"><td>selector.type</td>
 <td>replicating</td>
 <td>replicating or multiplexing</td>
 </tr>
-<tr class="row-odd"><td>selector.*</td>
+<tr class="row-even"><td>selector.*</td>
 <td>&nbsp;</td>
 <td>Depends on the selector.type value</td>
 </tr>
-<tr class="row-even"><td>interceptors</td>
+<tr class="row-odd"><td>interceptors</td>
 <td>&#8211;</td>
 <td>Space-separated list of interceptors</td>
 </tr>
-<tr class="row-odd"><td>interceptors.*</td>
+<tr class="row-even"><td>interceptors.*</td>
 <td>&nbsp;</td>
 <td>&nbsp;</td>
 </tr>
@@ -1383,11 +1428,125 @@ inefficient compared to <tt class="docut
 </div>
 </div>
 </div>
+<div class="section" id="taildir-source">
+<h4>Taildir Source<a class="headerlink" href="#taildir-source" title="Permalink to this headline">¶</a></h4>
+<div class="admonition note">
+<p class="first admonition-title">Note</p>
+<p class="last"><strong>This source is provided as a preview feature. It does not work on Windows.</strong></p>
+</div>
+<p>Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files.
+If the new lines are being written, this source will retry reading them in wait for the completion of the write.</p>
+<p>This source is reliable and will not miss data even when the tailing files rotate.
+It periodically writes the last read position of each files on the given position file in JSON format.
+If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file.</p>
+<p>In other use case, this source can also start tailing from the arbitrary position for each files using the given position file.
+When there is no position file on the specified path, it will start tailing from the first line of each files by default.</p>
+<p>Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first.</p>
+<p>This source does not rename or delete or do any modifications to the file being tailed.
+Currently this source does not support tailing binary files. It reads text files line by line.</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="19%" />
+<col width="16%" />
+<col width="65%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td><strong>channels</strong></td>
+<td>&#8211;</td>
+<td>&nbsp;</td>
+</tr>
+<tr class="row-odd"><td><strong>type</strong></td>
+<td>&#8211;</td>
+<td>The component type name, needs to be <tt class="docutils literal"><span class="pre">TAILDIR</span></tt>.</td>
+</tr>
+<tr class="row-even"><td><strong>filegroups</strong></td>
+<td>&#8211;</td>
+<td>Space-separated list of file groups. Each file group indicates a set of files to be tailed.</td>
+</tr>
+<tr class="row-odd"><td><strong>filegroups.&lt;filegroupName&gt;</strong></td>
+<td>&#8211;</td>
+<td>Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.</td>
+</tr>
+<tr class="row-even"><td>positionFile</td>
+<td>~/.flume/taildir_position.json</td>
+<td>File in JSON format to record the inode, the absolute path and the last position of each tailing file.</td>
+</tr>
+<tr class="row-odd"><td>headers.&lt;filegroupName&gt;.&lt;headerKey&gt;</td>
+<td>&#8211;</td>
+<td>Header value which is the set with header key. Multiple headers can be specified for one file group.</td>
+</tr>
+<tr class="row-even"><td>byteOffsetHeader</td>
+<td>false</td>
+<td>Whether to add the byte offset of a tailed line to a header called &#8216;byteoffset&#8217;.</td>
+</tr>
+<tr class="row-odd"><td>skipToEnd</td>
+<td>false</td>
+<td>Whether to skip the position to EOF in the case of files not written on the position file.</td>
+</tr>
+<tr class="row-even"><td>idleTimeout</td>
+<td>120000</td>
+<td>Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it.</td>
+</tr>
+<tr class="row-odd"><td>writePosInterval</td>
+<td>3000</td>
+<td>Interval time (ms) to write the last position of each file on the position file.</td>
+</tr>
+<tr class="row-even"><td>batchSize</td>
+<td>100</td>
+<td>Max number of lines to read and send to the channel at a time. Using the default is usually fine.</td>
+</tr>
+<tr class="row-odd"><td>backoffSleepIncrement</td>
+<td>1000</td>
+<td>The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.</td>
+</tr>
+<tr class="row-even"><td>maxBackoffSleep</td>
+<td>5000</td>
+<td>The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.</td>
+</tr>
+<tr class="row-odd"><td>cachePatternMatching</td>
+<td>true</td>
+<td>Listing directories and applying the filename regex pattern may be time consuming for directories
+containing thousands of files. Caching the list of matching files can improve performance.
+The order in which files are consumed will also be cached.
+Requires that the file system keeps track of modification times with at least a 1-second granularity.</td>
+</tr>
+<tr class="row-even"><td>fileHeader</td>
+<td>false</td>
+<td>Whether to add a header storing the absolute path filename.</td>
+</tr>
+<tr class="row-odd"><td>fileHeaderKey</td>
+<td>file</td>
+<td>Header key to use when appending absolute path filename to event header.</td>
+</tr>
+</tbody>
+</table>
+<p>Example for agent named a1:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.sources</span> <span class="o">=</span> <span class="s">r1</span>
+<span class="na">a1.channels</span> <span class="o">=</span> <span class="s">c1</span>
+<span class="na">a1.sources.r1.type</span> <span class="o">=</span> <span class="s">TAILDIR</span>
+<span class="na">a1.sources.r1.channels</span> <span class="o">=</span> <span class="s">c1</span>
+<span class="na">a1.sources.r1.positionFile</span> <span class="o">=</span> <span class="s">/var/log/flume/taildir_position.json</span>
+<span class="na">a1.sources.r1.filegroups</span> <span class="o">=</span> <span class="s">f1 f2</span>
+<span class="na">a1.sources.r1.filegroups.f1</span> <span class="o">=</span> <span class="s">/var/log/test1/example.log</span>
+<span class="na">a1.sources.r1.headers.f1.headerKey1</span> <span class="o">=</span> <span class="s">value1</span>
+<span class="na">a1.sources.r1.filegroups.f2</span> <span class="o">=</span> <span class="s">/var/log/test2/.*log.*</span>
+<span class="na">a1.sources.r1.headers.f2.headerKey1</span> <span class="o">=</span> <span class="s">value2</span>
+<span class="na">a1.sources.r1.headers.f2.headerKey2</span> <span class="o">=</span> <span class="s">value2-2</span>
+<span class="na">a1.sources.r1.fileHeader</span> <span class="o">=</span> <span class="s">true</span>
+</pre></div>
+</div>
+</div>
 <div class="section" id="twitter-1-firehose-source-experimental">
 <h4>Twitter 1% firehose Source (experimental)<a class="headerlink" href="#twitter-1-firehose-source-experimental" title="Permalink to this headline">¶</a></h4>
 <div class="admonition warning">
 <p class="first admonition-title">Warning</p>
-<p class="last">This source is hightly experimental and may change between minor versions of Flume.
+<p class="last">This source is highly experimental and may change between minor versions of Flume.
 Use at your own risk.</p>
 </div>
 <p>Experimental source that connects via Streaming API to the 1% sample twitter
@@ -1430,7 +1589,7 @@ Required properties are in <strong>bold<
 </tr>
 <tr class="row-odd"><td><strong>accessTokenSecret</strong></td>
 <td>&#8211;</td>
-<td>OAuth toekn secret</td>
+<td>OAuth token secret</td>
 </tr>
 <tr class="row-even"><td>maxBatchSize</td>
 <td>1000</td>
@@ -1458,14 +1617,14 @@ Required properties are in <strong>bold<
 </div>
 <div class="section" id="kafka-source">
 <h4>Kafka Source<a class="headerlink" href="#kafka-source" title="Permalink to this headline">¶</a></h4>
-<p>Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic.
+<p>Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics.
 If you have multiple Kafka sources running, you can configure them with the same Consumer Group
-so each will read a unique set of partitions for the topic.</p>
+so each will read a unique set of partitions for the topics.</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="21%" />
-<col width="8%" />
-<col width="71%" />
+<col width="19%" />
+<col width="6%" />
+<col width="75%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Property Name</th>
@@ -1480,68 +1639,245 @@ so each will read a unique set of partit
 </tr>
 <tr class="row-odd"><td><strong>type</strong></td>
 <td>&#8211;</td>
-<td>The component type name, needs to be <tt class="docutils literal"><span class="pre">org.apache.flume.source.kafka,KafkaSource</span></tt></td>
+<td>The component type name, needs to be <tt class="docutils literal"><span class="pre">org.apache.flume.source.kafka.KafkaSource</span></tt></td>
 </tr>
-<tr class="row-even"><td><strong>zookeeperConnect</strong></td>
+<tr class="row-even"><td><strong>kafka.bootstrap.servers</strong></td>
 <td>&#8211;</td>
-<td>URI of ZooKeeper used by Kafka cluster</td>
+<td>List of brokers in the Kafka cluster used by the source</td>
 </tr>
-<tr class="row-odd"><td><strong>groupId</strong></td>
+<tr class="row-odd"><td>kafka.consumer.group.id</td>
 <td>flume</td>
 <td>Unique identified of consumer group. Setting the same id in multiple sources or agents
 indicates that they are part of the same consumer group</td>
 </tr>
-<tr class="row-even"><td><strong>topic</strong></td>
+<tr class="row-even"><td><strong>kafka.topics</strong></td>
 <td>&#8211;</td>
-<td>Kafka topic we&#8217;ll read messages from. At the time, this is a single topic only.</td>
+<td>Comma-separated list of topics the kafka consumer will read messages from.</td>
 </tr>
-<tr class="row-odd"><td>batchSize</td>
+<tr class="row-odd"><td><strong>kafka.topics.regex</strong></td>
+<td>&#8211;</td>
+<td>Regex that defines set of topics the source is subscribed on. This property has higher priority
+than <tt class="docutils literal"><span class="pre">kafka.topics</span></tt> and overrides <tt class="docutils literal"><span class="pre">kafka.topics</span></tt> if exists.</td>
+</tr>
+<tr class="row-even"><td>batchSize</td>
 <td>1000</td>
 <td>Maximum number of messages written to Channel in one batch</td>
 </tr>
-<tr class="row-even"><td>batchDurationMillis</td>
+<tr class="row-odd"><td>batchDurationMillis</td>
 <td>1000</td>
 <td>Maximum time (in ms) before a batch will be written to Channel
 The batch will be written whenever the first of size and time will be reached.</td>
 </tr>
-<tr class="row-odd"><td>backoffSleepIncrement</td>
+<tr class="row-even"><td>backoffSleepIncrement</td>
 <td>1000</td>
 <td>Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty.
 Wait period will reduce aggressive pinging of an empty Kafka Topic.  One second is ideal for
 ingestion use cases but a lower value may be required for low latency operations with
 interceptors.</td>
 </tr>
-<tr class="row-even"><td>maxBackoffSleep</td>
+<tr class="row-odd"><td>maxBackoffSleep</td>
 <td>5000</td>
 <td>Maximum wait time that is triggered when a Kafka Topic appears to be empty.  Five seconds is
 ideal for ingestion use cases but a lower value may be required for low latency operations
 with interceptors.</td>
 </tr>
-<tr class="row-odd"><td>Other Kafka Consumer Properties</td>
-<td>&#8211;</td>
-<td>These properties are used to configure the Kafka Consumer. Any producer property supported
-by Kafka can be used. The only requirement is to prepend the property name with the prefix <tt class="docutils literal"><span class="pre">kafka.</span></tt>.
-For example: kafka.consumer.timeout.ms
-Check <cite>Kafka documentation &lt;https://kafka.apache.org/08/configuration.html#consumerconfigs&gt;</cite> for details</td>
+<tr class="row-even"><td>useFlumeEventFormat</td>
+<td>false</td>
+<td>By default events are taken as bytes from the Kafka topic directly into the event body. Set to
+true to read events as the Flume Avro binary format. Used in conjunction with the same property
+on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve
+any Flume headers sent on the producing side.</td>
+</tr>
+<tr class="row-odd"><td>migrateZookeeperOffsets</td>
+<td>true</td>
+<td>When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.
+This should be true to support seamless Kafka client migration from older versions of Flume.
+Once migrated this can be set to false, though that should generally not be required.
+If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset
+defines how offsets are handled.
+Check <a class="reference external" href="http://kafka.apache.org/documentation.html#newconsumerconfigs">Kafka documentation</a> for details</td>
+</tr>
+<tr class="row-even"><td>kafka.consumer.security.protocol</td>
+<td>PLAINTEXT</td>
+<td>Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.</td>
+</tr>
+<tr class="row-odd"><td><em>more consumer security props</em></td>
+<td>&nbsp;</td>
+<td>If using SASL_PLAINTEXT, SASL_SSL or SSL refer to <a class="reference external" href="http://kafka.apache.org/documentation.html#security">Kafka security</a> for additional
+properties that need to be set on consumer.</td>
+</tr>
+<tr class="row-even"><td>Other Kafka Consumer Properties</td>
+<td>&#8211;</td>
+<td>These properties are used to configure the Kafka Consumer. Any consumer property supported
+by Kafka can be used. The only requirement is to prepend the property name with the prefix
+<tt class="docutils literal"><span class="pre">kafka.consumer</span></tt>.
+For example: <tt class="docutils literal"><span class="pre">kafka.consumer.auto.offset.reset</span></tt></td>
 </tr>
 </tbody>
 </table>
 <div class="admonition note">
 <p class="first admonition-title">Note</p>
 <p class="last">The Kafka Source overrides two Kafka consumer parameters:
-auto.commit.enable is set to &#8220;false&#8221; by the source and we commit every batch. For improved performance
-this can be set to &#8220;true&#8221;, however, this can lead to loss of data
-consumer.timeout.ms is set to 10ms, so when we check Kafka for new data we wait at most 10ms for the data to arrive
-setting this to a higher value can reduce CPU utilization (we&#8217;ll poll Kafka in less of a tight loop), but also means
-higher latency in writing batches to channel (since we&#8217;ll wait longer for data to arrive).</p>
+auto.commit.enable is set to &#8220;false&#8221; by the source and every batch is committed. Kafka source guarantees at least once
+strategy of messages retrieval. The duplicates can be present when the source starts.
+The Kafka Source also provides defaults for the key.deserializer(org.apache.kafka.common.serialization.StringSerializer)
+and value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.</p>
+</div>
+<p>Deprecated Properties</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="22%" />
+<col width="13%" />
+<col width="65%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td>topic</td>
+<td>&#8211;</td>
+<td>Use kafka.topics</td>
+</tr>
+<tr class="row-odd"><td>groupId</td>
+<td>flume</td>
+<td>Use kafka.consumer.group.id</td>
+</tr>
+<tr class="row-even"><td>zookeeperConnect</td>
+<td>&#8211;</td>
+<td>Is no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers
+to establish connection with kafka cluster</td>
+</tr>
+</tbody>
+</table>
+<p>Example for topic subscription by comma-separated topic list.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">tier1.sources.source1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.source.kafka.KafkaSource</span>
+<span class="na">tier1.sources.source1.channels</span> <span class="o">=</span> <span class="s">channel1</span>
+<span class="na">tier1.sources.source1.batchSize</span> <span class="o">=</span> <span class="s">5000</span>
+<span class="na">tier1.sources.source1.batchDurationMillis</span> <span class="o">=</span> <span class="s">2000</span>
+<span class="na">tier1.sources.source1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">localhost:9092</span>
+<span class="na">tier1.sources.source1.kafka.topics</span> <span class="o">=</span> <span class="s">test1, test2</span>
+<span class="na">tier1.sources.source1.kafka.consumer.group.id</span> <span class="o">=</span> <span class="s">custom.g.id</span>
+</pre></div>
 </div>
-<p>Example for agent named tier1:</p>
+<p>Example for topic subscription by regex</p>
 <div class="highlight-properties"><div class="highlight"><pre><span class="na">tier1.sources.source1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.source.kafka.KafkaSource</span>
 <span class="na">tier1.sources.source1.channels</span> <span class="o">=</span> <span class="s">channel1</span>
-<span class="na">tier1.sources.source1.zookeeperConnect</span> <span class="o">=</span> <span class="s">localhost:2181</span>
-<span class="na">tier1.sources.source1.topic</span> <span class="o">=</span> <span class="s">test1</span>
-<span class="na">tier1.sources.source1.groupId</span> <span class="o">=</span> <span class="s">flume</span>
-<span class="na">tier1.sources.source1.kafka.consumer.timeout.ms</span> <span class="o">=</span> <span class="s">100</span>
+<span class="na">tier1.sources.source1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">localhost:9092</span>
+<span class="na">tier1.sources.source1.kafka.topics.regex</span> <span class="o">=</span> <span class="s">^topic[0-9]$</span>
+<span class="c"># the default kafka.consumer.group.id=flume is used</span>
+</pre></div>
+</div>
+<p><strong>Security and Kafka Source:</strong></p>
+<p>Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.</p>
+<p>As of now data encryption is solely provided by SSL/TLS.</p>
+<p>Setting <tt class="docutils literal"><span class="pre">kafka.consumer.security.protocol</span></tt> to any of the following value means:</p>
+<ul class="simple">
+<li><strong>SASL_PLAINTEXT</strong> - Kerberos or plaintext authentication with no data encryption</li>
+<li><strong>SASL_SSL</strong> - Kerberos or plaintext authentication with data encryption</li>
+<li><strong>SSL</strong> - TLS based encryption with optional authentication.</li>
+</ul>
+<div class="admonition warning">
+<p class="first admonition-title">Warning</p>
+<p class="last">There is a performance degradation when SSL is enabled,
+the magnitude of which depends on the CPU type and the JVM implementation.
+Reference: <a class="reference external" href="http://kafka.apache.org/documentation#security_overview">Kafka security overview</a>
+and the jira for tracking this issue:
+<a class="reference external" href="https://issues.apache.org/jira/browse/KAFKA-2561">KAFKA-2561</a></p>
+</div>
+<p><strong>TLS and Kafka Source:</strong></p>
+<p>Please read the steps described in <a class="reference external" href="http://kafka.apache.org/documentation#security_configclients">Configuring Kafka Clients SSL</a>
+to learn about additional configuration settings for fine tuning for example any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore types.</p>
+<p>Example configuration with server side authentication and data encryption.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span> <span class="o">=</span> <span class="s">SSL</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.location</span><span class="o">=</span><span class="s">/path/to/truststore.jks</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.password</span><span class="o">=</span><span class="s">&lt;password to access the truststore&gt;</span>
+</pre></div>
+</div>
+<p>Note: By default the property <tt class="docutils literal"><span class="pre">ssl.endpoint.identification.algorithm</span></tt>
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm</span><span class="o">=</span><span class="s">HTTPS</span>
+</pre></div>
+</div>
+<p>Once enabled, clients will verify the server&#8217;s fully qualified domain name (FQDN)
+against one of the following two fields:</p>
+<ol class="arabic simple">
+<li>Common Name (CN) <a class="reference external" href="https://tools.ietf.org/html/rfc6125#section-2.3">https://tools.ietf.org/html/rfc6125#section-2.3</a></li>
+<li>Subject Alternative Name (SAN) <a class="reference external" href="https://tools.ietf.org/html/rfc5280#section-4.2.1.6">https://tools.ietf.org/html/rfc5280#section-4.2.1.6</a></li>
+</ol>
+<p>If client side authentication is also required then additionally the following should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either
+individually or by their signature chain. Common example is to sign each client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.kafka.consumer.ssl.keystore.location</span><span class="o">=</span><span class="s">/path/to/client.keystore.jks</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.keystore.password</span><span class="o">=</span><span class="s">&lt;password to access the keystore&gt;</span>
+</pre></div>
+</div>
+<p>If keystore and key use different password protection then <tt class="docutils literal"><span class="pre">ssl.key.password</span></tt> property will
+provide the required additional secret for both consumer keystores:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.kafka.consumer.ssl.key.password</span><span class="o">=</span><span class="s">&lt;password to access the key&gt;</span>
+</pre></div>
+</div>
+<p><strong>Kerberos and Kafka Source:</strong></p>
+<p>To use Kafka source with a Kafka cluster secured with Kerberos, set the <tt class="docutils literal"><span class="pre">consumer.security.protocol</span></tt> properties noted above for consumer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file&#8217;s &#8220;KafkaClient&#8221; section. &#8220;Client&#8221; section describes the Zookeeper connection if needed.
+See <a class="reference external" href="http://kafka.apache.org/documentation.html#security_sasl_clientconfig">Kafka doc</a>
+for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">JAVA_OPTS</span><span class="o">=</span><span class="s">&quot;$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf&quot;</span>
+<span class="na">JAVA_OPTS</span><span class="o">=</span><span class="s">&quot;$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf&quot;</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_PLAINTEXT:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span> <span class="o">=</span> <span class="s">SASL_PLAINTEXT</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.mechanism</span> <span class="o">=</span> <span class="s">GSSAPI</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name</span> <span class="o">=</span> <span class="s">kafka</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_SSL:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span> <span class="o">=</span> <span class="s">SASL_SSL</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.mechanism</span> <span class="o">=</span> <span class="s">GSSAPI</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name</span> <span class="o">=</span> <span class="s">kafka</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.location</span><span class="o">=</span><span class="s">/path/to/truststore.jks</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.password</span><span class="o">=</span><span class="s">&lt;password to access the truststore&gt;</span>
+</pre></div>
+</div>
+<p>Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of <a class="reference external" href="http://kafka.apache.org/documentation#security_sasl_clientconfig">SASL configuration</a>.
+Since the Kafka Source may also connect to Zookeeper for offset migration, the &#8220;Client&#8221; section was also added to this example.
+This won&#8217;t be needed unless you require offset migration, or you require this section for other secure components.
+Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.</p>
+<div class="highlight-javascript"><div class="highlight"><pre><span class="nx">Client</span> <span class="p">{</span>
+  <span class="nx">com</span><span class="p">.</span><span class="nb">sun</span><span class="p">.</span><span class="nx">security</span><span class="p">.</span><span class="nx">auth</span><span class="p">.</span><span class="nx">module</span><span class="p">.</span><span class="nx">Krb5LoginModule</span> <span class="nx">required</span>
+  <span class="nx">useKeyTab</span><span class="o">=</span><span class="kc">true</span>
+  <span class="nx">storeKey</span><span class="o">=</span><span class="kc">true</span>
+  <span class="nx">keyTab</span><span class="o">=</span><span class="s2">&quot;/path/to/keytabs/flume.keytab&quot;</span>
+  <span class="nx">principal</span><span class="o">=</span><span class="s2">&quot;flume/flumehost1.example.com@YOURKERBEROSREALM&quot;</span><span class="p">;</span>
+<span class="p">};</span>
+
+<span class="nx">KafkaClient</span> <span class="p">{</span>
+  <span class="nx">com</span><span class="p">.</span><span class="nb">sun</span><span class="p">.</span><span class="nx">security</span><span class="p">.</span><span class="nx">auth</span><span class="p">.</span><span class="nx">module</span><span class="p">.</span><span class="nx">Krb5LoginModule</span> <span class="nx">required</span>
+  <span class="nx">useKeyTab</span><span class="o">=</span><span class="kc">true</span>
+  <span class="nx">storeKey</span><span class="o">=</span><span class="kc">true</span>
+  <span class="nx">keyTab</span><span class="o">=</span><span class="s2">&quot;/path/to/keytabs/flume.keytab&quot;</span>
+  <span class="nx">principal</span><span class="o">=</span><span class="s2">&quot;flume/flumehost1.example.com@YOURKERBEROSREALM&quot;</span><span class="p">;</span>
+<span class="p">};</span>
 </pre></div>
 </div>
 </div>
@@ -1613,21 +1949,21 @@ Flume event and sent via the connected c
 <span class="na">a1.channels</span> <span class="o">=</span> <span class="s">c1</span>
 <span class="na">a1.sources.r1.type</span> <span class="o">=</span> <span class="s">netcat</span>
 <span class="na">a1.sources.r1.bind</span> <span class="o">=</span> <span class="s">0.0.0.0</span>
-<span class="na">a1.sources.r1.bind</span> <span class="o">=</span> <span class="s">6666</span>
+<span class="na">a1.sources.r1.port</span> <span class="o">=</span> <span class="s">6666</span>
 <span class="na">a1.sources.r1.channels</span> <span class="o">=</span> <span class="s">c1</span>
 </pre></div>
 </div>
 </div>
 <div class="section" id="sequence-generator-source">
 <h4>Sequence Generator Source<a class="headerlink" href="#sequence-generator-source" title="Permalink to this headline">¶</a></h4>
-<p>A simple sequence generator that continuously generates events with a counter
-that starts from 0 and increments by 1. Useful mainly for testing.
-Required properties are in <strong>bold</strong>.</p>
+<p>A simple sequence generator that continuously generates events with a counter that starts from 0,
+increments by 1 and stops at totalEvents. Retries when it can&#8217;t send events to the channel. Useful
+mainly for testing. Required properties are in <strong>bold</strong>.</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="20%" />
-<col width="16%" />
-<col width="64%" />
+<col width="19%" />
+<col width="21%" />
+<col width="60%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Property Name</th>
@@ -1664,6 +2000,10 @@ Required properties are in <strong>bold<
 <td>1</td>
 <td>&nbsp;</td>
 </tr>
+<tr class="row-odd"><td>totalEvents</td>
+<td>Long.MAX_VALUE</td>
+<td>Number of unique events sent by the source.</td>
+</tr>
 </tbody>
 </table>
 <p>Example for agent named a1:</p>
@@ -2395,8 +2735,8 @@ required.</p>
 <p>The following are the escape sequences supported:</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="10%" />
-<col width="90%" />
+<col width="15%" />
+<col width="85%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Alias</th>
@@ -2473,8 +2813,19 @@ required.</p>
 <tr class="row-even"><td>%z</td>
 <td>+hhmm numeric timezone (for example, -0400)</td>
 </tr>
+<tr class="row-odd"><td>%[localhost]</td>
+<td>Substitute the hostname of the host where the agent is running</td>
+</tr>
+<tr class="row-even"><td>%[IP]</td>
+<td>Substitute the IP address of the host where the agent is running</td>
+</tr>
+<tr class="row-odd"><td>%[FQDN]</td>
+<td>Substitute the canonical hostname of the host where the agent is running</td>
+</tr>
 </tbody>
 </table>
+<p>Note: The escape strings %[localhost], %[IP] and %[FQDN] all rely on Java&#8217;s ability to obtain the
+hostname, which may fail in some networking environments.</p>
 <p>The file in use will have the name mangled to include &#8221;.tmp&#8221; at the end. Once
 the file is closed, this extension is removed. This allows excluding partially
 complete files in the directory.
@@ -2662,8 +3013,7 @@ timestamp 11:54:34 AM, June 12, 2012 wil
 Events are written using Hive transactions. As soon as a set of events are committed to Hive, they become
 immediately visible to Hive queries. Partitions to which flume will stream to can either be pre-created
 or, optionally, Flume can create them if they are missing. Fields from incoming event data are mapped to
-corresponding columns in the Hive table. <strong>This sink is provided as a preview feature and not recommended
-for use in production.</strong></p>
+corresponding columns in the Hive table.</p>
 <table border="1" class="docutils">
 <colgroup>
 <col width="15%" />
@@ -2918,8 +3268,9 @@ accept tab separated input containing th
 </div>
 <div class="section" id="logger-sink">
 <h4>Logger Sink<a class="headerlink" href="#logger-sink" title="Permalink to this headline">¶</a></h4>
-<p>Logs event at INFO level. Typically useful for testing/debugging purpose.
-Required properties are in <strong>bold</strong>.</p>
+<p>Logs event at INFO level. Typically useful for testing/debugging purpose. Required properties are
+in <strong>bold</strong>. This sink is the only exception which doesn&#8217;t require the extra configuration
+explained in the <a class="reference internal" href="#logging-raw-data">Logging raw data</a> section.</p>
 <table border="1" class="docutils">
 <colgroup>
 <col width="20%" />
@@ -3243,9 +3594,9 @@ backslash, like this: &#8220;\n&#8221;)<
 Required properties are in <strong>bold</strong>.</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="13%" />
+<col width="17%" />
 <col width="5%" />
-<col width="82%" />
+<col width="78%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Property Name</th>
@@ -3266,15 +3617,27 @@ Required properties are in <strong>bold<
 <td>&#8211;</td>
 <td>The directory where files will be stored</td>
 </tr>
-<tr class="row-odd"><td>sink.rollInterval</td>
+<tr class="row-odd"><td>sink.pathManager</td>
+<td>DEFAULT</td>
+<td>The PathManager implementation to use.</td>
+</tr>
+<tr class="row-even"><td>sink.pathManager.extension</td>
+<td>&#8211;</td>
+<td>The file extension if the default PathManager is used.</td>
+</tr>
+<tr class="row-odd"><td>sink.pathManager.prefix</td>
+<td>&#8211;</td>
+<td>A character string to add to the beginning of the file name if the default PathManager is used</td>
+</tr>
+<tr class="row-even"><td>sink.rollInterval</td>
 <td>30</td>
 <td>Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.</td>
 </tr>
-<tr class="row-even"><td>sink.serializer</td>
+<tr class="row-odd"><td>sink.serializer</td>
 <td>TEXT</td>
 <td>Other possible options include <tt class="docutils literal"><span class="pre">avro_event</span></tt> or the FQCN of an implementation of EventSerializer.Builder interface.</td>
 </tr>
-<tr class="row-odd"><td>batchSize</td>
+<tr class="row-even"><td>batchSize</td>
 <td>100</td>
 <td>&nbsp;</td>
 </tr>
@@ -3827,13 +4190,14 @@ the kerberos principal</td>
 <p>This is a Flume Sink implementation that can publish data to a
 <a class="reference external" href="http://kafka.apache.org/">Kafka</a> topic. One of the objective is to integrate Flume
 with Kafka so that pull based processing systems can process the data coming
-through various Flume sources. This currently supports Kafka 0.8.x series of releases.</p>
+through various Flume sources. This currently supports Kafka 0.9.x series of releases.</p>
+<p>This version of Flume no longer supports Older Versions (0.8.x) of Kafka.</p>
 <p>Required properties are marked in bold font.</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="20%" />
-<col width="12%" />
-<col width="68%" />
+<col width="18%" />
+<col width="10%" />
+<col width="72%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Property Name</th>
@@ -3846,34 +4210,65 @@ through various Flume sources. This curr
 <td>&#8211;</td>
 <td>Must be set to <tt class="docutils literal"><span class="pre">org.apache.flume.sink.kafka.KafkaSink</span></tt></td>
 </tr>
-<tr class="row-odd"><td><strong>brokerList</strong></td>
+<tr class="row-odd"><td><strong>kafka.bootstrap.servers</strong></td>
 <td>&#8211;</td>
 <td>List of brokers Kafka-Sink will connect to, to get the list of topic partitions
 This can be a partial list of brokers, but we recommend at least two for HA.
 The format is comma separated list of hostname:port</td>
 </tr>
-<tr class="row-even"><td>topic</td>
+<tr class="row-even"><td>kafka.topic</td>
 <td>default-flume-topic</td>
 <td>The topic in Kafka to which the messages will be published. If this parameter is configured,
 messages will be published to this topic.
 If the event header contains a &#8220;topic&#8221; field, the event will be published to that topic
 overriding the topic configured here.</td>
 </tr>
-<tr class="row-odd"><td>batchSize</td>
+<tr class="row-odd"><td>flumeBatchSize</td>
 <td>100</td>
 <td>How many messages to process in one batch. Larger batches improve throughput while adding latency.</td>
 </tr>
-<tr class="row-even"><td>requiredAcks</td>
+<tr class="row-even"><td>kafka.producer.acks</td>
 <td>1</td>
 <td>How many replicas must acknowledge a message before its considered successfully written.
 Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)
 Set this to -1 to avoid data loss in some cases of leader failure.</td>
 </tr>
-<tr class="row-odd"><td>Other Kafka Producer Properties</td>
+<tr class="row-odd"><td>useFlumeEventFormat</td>
+<td>false</td>
+<td>By default events are put as bytes onto the Kafka topic directly from the event body. Set to
+true to store events as the Flume Avro binary format. Used in conjunction with the same property
+on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve
+any Flume headers for the producing side.</td>
+</tr>
+<tr class="row-even"><td>defaultPartitionId</td>
+<td>&#8211;</td>
+<td>Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless
+overriden by <tt class="docutils literal"><span class="pre">partitionIdHeader</span></tt>. By default, if this property is not set, events will be
+distributed by the Kafka Producer&#8217;s partitioner - including by <tt class="docutils literal"><span class="pre">key</span></tt> if specified (or by a
+partitioner specified by <tt class="docutils literal"><span class="pre">kafka.partitioner.class</span></tt>).</td>
+</tr>
+<tr class="row-odd"><td>partitionIdHeader</td>
+<td>&#8211;</td>
+<td>When set, the sink will take the value of the field named using the value of this property
+from the event header and send the message to the specified partition of the topic. If the
+value represents an invalid partition, an EventDeliveryException will be thrown. If the header value
+is present then this setting overrides <tt class="docutils literal"><span class="pre">defaultPartitionId</span></tt>.</td>
+</tr>
+<tr class="row-even"><td>kafka.producer.security.protocol</td>
+<td>PLAINTEXT</td>
+<td>Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.</td>
+</tr>
+<tr class="row-odd"><td><em>more producer security props</em></td>
+<td>&nbsp;</td>
+<td>If using SASL_PLAINTEXT, SASL_SSL or SSL refer to <a class="reference external" href="http://kafka.apache.org/documentation.html#security">Kafka security</a> for additional
+properties that need to be set on producer.</td>
+</tr>
+<tr class="row-even"><td>Other Kafka Producer Properties</td>
 <td>&#8211;</td>
 <td>These properties are used to configure the Kafka Producer. Any producer property supported
-by Kafka can be used. The only requirement is to prepend the property name with the prefix <tt class="docutils literal"><span class="pre">kafka.</span></tt>.
-For example: kafka.producer.type</td>
+by Kafka can be used. The only requirement is to prepend the property name with the prefix
+<tt class="docutils literal"><span class="pre">kafka.producer</span></tt>.
+For example: kafka.producer.linger.ms</td>
 </tr>
 </tbody>
 </table>
@@ -3884,19 +4279,152 @@ If <tt class="docutils literal"><span cl
 If <tt class="docutils literal"><span class="pre">key</span></tt> exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key
 will be sent to the same partition. If the key is null, events will be sent to random partitions.</p>
 </div>
+<p>The Kafka sink also provides defaults for the key.serializer(org.apache.kafka.common.serialization.StringSerializer)
+and value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.</p>
+<p>Deprecated Properties</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="22%" />
+<col width="13%" />
+<col width="65%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td>brokerList</td>
+<td>&#8211;</td>
+<td>Use kafka.bootstrap.servers</td>
+</tr>
+<tr class="row-odd"><td>topic</td>
+<td>default-flume-topic</td>
+<td>Use kafka.topic</td>
+</tr>
+<tr class="row-even"><td>batchSize</td>
+<td>100</td>
+<td>Use kafka.flumeBatchSize</td>
+</tr>
+<tr class="row-odd"><td>requiredAcks</td>
+<td>1</td>
+<td>Use kafka.producer.acks</td>
+</tr>
+</tbody>
+</table>
 <p>An example configuration of a Kafka sink is given below. Properties starting
-with the prefix <tt class="docutils literal"><span class="pre">kafka</span></tt> (the last 3 properties) are used when instantiating
-the Kafka producer. The properties that are passed when creating the Kafka
+with the prefix <tt class="docutils literal"><span class="pre">kafka.producer</span></tt> the Kafka producer. The properties that are passed when creating the Kafka
 producer are not limited to the properties given in this example.
-Also it&#8217;s possible include your custom properties here and access them inside
+Also it is possible to include your custom properties here and access them inside
 the preprocessor through the Flume Context object passed in as a method
 argument.</p>
-<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.sinks.k1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.sink.kafka.KafkaSink</span>
-<span class="na">a1.sinks.k1.topic</span> <span class="o">=</span> <span class="s">mytopic</span>
-<span class="na">a1.sinks.k1.brokerList</span> <span class="o">=</span> <span class="s">localhost:9092</span>
-<span class="na">a1.sinks.k1.requiredAcks</span> <span class="o">=</span> <span class="s">1</span>
-<span class="na">a1.sinks.k1.batchSize</span> <span class="o">=</span> <span class="s">20</span>
-<span class="na">a1.sinks.k1.channel</span> <span class="o">=</span> <span class="s">c1</span>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.sinks.k1.channel</span> <span class="o">=</span> <span class="s">c1</span>
+<span class="na">a1.sinks.k1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.sink.kafka.KafkaSink</span>
+<span class="na">a1.sinks.k1.kafka.topic</span> <span class="o">=</span> <span class="s">mytopic</span>
+<span class="na">a1.sinks.k1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">localhost:9092</span>
+<span class="na">a1.sinks.k1.kafka.flumeBatchSize</span> <span class="o">=</span> <span class="s">20</span>
+<span class="na">a1.sinks.k1.kafka.producer.acks</span> <span class="o">=</span> <span class="s">1</span>
+<span class="na">a1.sinks.k1.kafka.producer.linger.ms</span> <span class="o">=</span> <span class="s">1</span>
+<span class="na">a1.sinks.ki.kafka.producer.compression.type</span> <span class="o">=</span> <span class="s">snappy</span>
+</pre></div>
+</div>
+<p><strong>Security and Kafka Sink:</strong></p>
+<p>Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.</p>
+<p>As of now data encryption is solely provided by SSL/TLS.</p>
+<p>Setting <tt class="docutils literal"><span class="pre">kafka.producer.security.protocol</span></tt> to any of the following value means:</p>
+<ul class="simple">
+<li><strong>SASL_PLAINTEXT</strong> - Kerberos or plaintext authentication with no data encryption</li>
+<li><strong>SASL_SSL</strong> - Kerberos or plaintext authentication with data encryption</li>
+<li><strong>SSL</strong> - TLS based encryption with optional authentication.</li>
+</ul>
+<div class="admonition warning">
+<p class="first admonition-title">Warning</p>
+<p class="last">There is a performance degradation when SSL is enabled,
+the magnitude of which depends on the CPU type and the JVM implementation.
+Reference: <a class="reference external" href="http://kafka.apache.org/documentation#security_overview">Kafka security overview</a>
+and the jira for tracking this issue:
+<a class="reference external" href="https://issues.apache.org/jira/browse/KAFKA-2561">KAFKA-2561</a></p>
+</div>
+<p><strong>TLS and Kafka Sink:</strong></p>
+<p>Please read the steps described in <a class="reference external" href="http://kafka.apache.org/documentation#security_configclients">Configuring Kafka Clients SSL</a>
+to learn about additional configuration settings for fine tuning for example any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore types.</p>
+<p>Example configuration with server side authentication and data encryption.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span> <span class="o">=</span> <span class="s">SSL</span>
+<span class="na">a1.channels.channel1.kafka.producer.ssl.truststore.location</span> <span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span class="na">a1.channels.channel1.kafka.producer.ssl.truststore.password</span> <span class="o">=</span> <span class="s">&lt;password to access the truststore&gt;</span>
+</pre></div>
+</div>
+<p>Note: By default the property <tt class="docutils literal"><span class="pre">ssl.endpoint.identification.algorithm</span></tt>
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm</span> <span class="o">=</span> <span class="s">HTTPS</span>
+</pre></div>
+</div>
+<p>Once enabled, clients will verify the server&#8217;s fully qualified domain name (FQDN)
+against one of the following two fields:</p>
+<ol class="arabic simple">
+<li>Common Name (CN) <a class="reference external" href="https://tools.ietf.org/html/rfc6125#section-2.3">https://tools.ietf.org/html/rfc6125#section-2.3</a></li>
+<li>Subject Alternative Name (SAN) <a class="reference external" href="https://tools.ietf.org/html/rfc5280#section-4.2.1.6">https://tools.ietf.org/html/rfc5280#section-4.2.1.6</a></li>
+</ol>
+<p>If client side authentication is also required then additionally the following should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either
+individually or by their signature chain. Common example is to sign each client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.kafka.producer.ssl.keystore.location</span> <span class="o">=</span> <span class="s">/path/to/client.keystore.jks</span>
+<span class="na">a1.channels.channel1.kafka.producer.ssl.keystore.password</span> <span class="o">=</span> <span class="s">&lt;password to access the keystore&gt;</span>
+</pre></div>
+</div>
+<p>If keystore and key use different password protection then <tt class="docutils literal"><span class="pre">ssl.key.password</span></tt> property will
+provide the required additional secret for producer keystore:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.kafka.producer.ssl.key.password</span> <span class="o">=</span> <span class="s">&lt;password to access the key&gt;</span>
+</pre></div>
+</div>
+<p><strong>Kerberos and Kafka Sink:</strong></p>
+<p>To use Kafka sink with a Kafka cluster secured with Kerberos, set the <tt class="docutils literal"><span class="pre">producer.security.protocol</span></tt> property noted above for producer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file&#8217;s &#8220;KafkaClient&#8221; section. &#8220;Client&#8221; section describes the Zookeeper connection if needed.
+See <a class="reference external" href="http://kafka.apache.org/documentation.html#security_sasl_clientconfig">Kafka doc</a>
+for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">JAVA_OPTS</span><span class="o">=</span><span class="s">&quot;$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf&quot;</span>
+<span class="na">JAVA_OPTS</span><span class="o">=</span><span class="s">&quot;$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf&quot;</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_PLAINTEXT:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span> <span class="o">=</span> <span class="s">SASL_PLAINTEXT</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.mechanism</span> <span class="o">=</span> <span class="s">GSSAPI</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.kerberos.service.name</span> <span class="o">=</span> <span class="s">kafka</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_SSL:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span> <span class="o">=</span> <span class="s">SASL_SSL</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.mechanism</span> <span class="o">=</span> <span class="s">GSSAPI</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.kerberos.service.name</span> <span class="o">=</span> <span class="s">kafka</span>
+<span class="na">a1.channels.channel1.kafka.producer.ssl.truststore.location</span> <span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span class="na">a1.channels.channel1.kafka.producer.ssl.truststore.password</span> <span class="o">=</span> <span class="s">&lt;password to access the truststore&gt;</span>
+</pre></div>
+</div>
+<p>Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of <a class="reference external" href="http://kafka.apache.org/documentation#security_sasl_clientconfig">SASL configuration</a>.
+Unlike the Kafka Source or Kafka Channel a &#8220;Client&#8221; section is not required, unless it is needed by other connecting components. Also please make sure
+that the operating system user of the Flume processes has read privileges on the jaas and keytab files.</p>
+<div class="highlight-javascript"><div class="highlight"><pre><span class="nx">KafkaClient</span> <span class="p">{</span>
+  <span class="nx">com</span><span class="p">.</span><span class="nb">sun</span><span class="p">.</span><span class="nx">security</span><span class="p">.</span><span class="nx">auth</span><span class="p">.</span><span class="nx">module</span><span class="p">.</span><span class="nx">Krb5LoginModule</span> <span class="nx">required</span>
+  <span class="nx">useKeyTab</span><span class="o">=</span><span class="kc">true</span>
+  <span class="nx">storeKey</span><span class="o">=</span><span class="kc">true</span>
+  <span class="nx">keyTab</span><span class="o">=</span><span class="s2">&quot;/path/to/keytabs/flume.keytab&quot;</span>
+  <span class="nx">principal</span><span class="o">=</span><span class="s2">&quot;flume/flumehost1.example.com@YOURKERBEROSREALM&quot;</span><span class="p">;</span>
+<span class="p">};</span>
 </pre></div>
 </div>
 </div>
@@ -4101,16 +4629,29 @@ READ_COMMITTED, SERIALIZABLE, REPEATABLE
 <h4>Kafka Channel<a class="headerlink" href="#kafka-channel" title="Permalink to this headline">¶</a></h4>
 <p>The events are stored in a Kafka cluster (must be installed separately). Kafka provides high availability and
 replication, so in case an agent or a kafka broker crashes, the events are immediately available to other sinks</p>
-<p>The Kafka channel can be used for multiple scenarios:
-* With Flume source and sink - it provides a reliable and highly available channel for events
-* With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps
-* With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sources such as HDFS, HBase or Solr</p>
+<p>The Kafka channel can be used for multiple scenarios:</p>
+<ol class="arabic simple">
+<li>With Flume source and sink - it provides a reliable and highly available channel for events</li>
+<li>With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps</li>
+<li>With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr</li>
+</ol>
+<p>This version of Flume requires Kafka version 0.9 or greater due to the reliance on the Kafka clients shipped with that version. The configuration of
+the channel has changed compared to previous flume versions.</p>
+<p>The configuration parameters are organized as such:</p>
+<ol class="arabic simple">
+<li>Configuration values related to the channel generically are applied at the channel config level, eg: a1.channel.k1.type =</li>
+<li>Configuration values related to Kafka or how the Channel operates are prefixed with &#8220;kafka.&#8221;, (this are analgous to CommonClient Configs) eg: a1.channels.k1.kafka.topic and a1.channels.k1.kafka.bootstrap.servers. This is not dissimilar to how the hdfs sink operates</li>
+<li>Properties specific to the producer/consumer are prefixed by kafka.producer or kafka.consumer</li>
+<li>Where possible, the Kafka paramter names are used, eg: bootstrap.servers and acks</li>
+</ol>
+<p>This version of flume is backwards-compatible with previous versions, however deprecated properties are indicated in the table below and a warning message
+is logged on startup when they are present in the configuration file.</p>
 <p>Required properties are in <strong>bold</strong>.</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="14%" />
-<col width="16%" />
-<col width="70%" />
+<col width="19%" />
+<col width="13%" />
+<col width="68%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Property Name</th>
@@ -4123,49 +4664,110 @@ replication, so in case an agent or a ka
 <td>&#8211;</td>
 <td>The component type name, needs to be <tt class="docutils literal"><span class="pre">org.apache.flume.channel.kafka.KafkaChannel</span></tt></td>
 </tr>
-<tr class="row-odd"><td><strong>brokerList</strong></td>
+<tr class="row-odd"><td><strong>kafka.bootstrap.servers</strong></td>
 <td>&#8211;</td>
 <td>List of brokers in the Kafka cluster used by the channel
 This can be a partial list of brokers, but we recommend at least two for HA.
 The format is comma separated list of hostname:port</td>
 </tr>
-<tr class="row-even"><td><strong>zookeeperConnect</strong></td>
-<td>&#8211;</td>
-<td>URI of ZooKeeper used by Kafka cluster
-The format is comma separated list of hostname:port. If chroot is used, it is added once at the end.
-For example: zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2181/kafka</td>
-</tr>
-<tr class="row-odd"><td>topic</td>
+<tr class="row-even"><td>kafka.topic</td>
 <td>flume-channel</td>
 <td>Kafka topic which the channel will use</td>
 </tr>
-<tr class="row-even"><td>groupId</td>
+<tr class="row-odd"><td>kafka.consumer.group.id</td>
 <td>flume</td>
 <td>Consumer group ID the channel uses to register with Kafka.
 Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data
 Note that having non-channel consumers with the same ID can lead to data loss.</td>
 </tr>
-<tr class="row-odd"><td>parseAsFlumeEvent</td>
+<tr class="row-even"><td>parseAsFlumeEvent</td>
 <td>true</td>
 <td>Expecting Avro datums with FlumeEvent schema in the channel.
-This should be true if Flume source is writing to the channel
-And false if other producers are writing into the topic that the channel is using
-Flume source messages to Kafka can be parsed outside of Flume by using
+This should be true if Flume source is writing to the channel and false if other producers are
+writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using
 org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact</td>
 </tr>
-<tr class="row-even"><td>readSmallestOffset</td>
+<tr class="row-odd"><td>migrateZookeeperOffsets</td>
+<td>true</td>
+<td>When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.
+This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set
+to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset
+configuration defines how offsets are handled.</td>
+</tr>
+<tr class="row-even"><td>pollTimeout</td>
+<td>500</td>
+<td>The amount of time(in milliseconds) to wait in the &#8220;poll()&#8221; call of the consumer.
+<a class="reference external" href="https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long">https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long</a>)</td>
+</tr>
+<tr class="row-odd"><td>defaultPartitionId</td>
+<td>&#8211;</td>
+<td>Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless
+overriden by <tt class="docutils literal"><span class="pre">partitionIdHeader</span></tt>. By default, if this property is not set, events will be
+distributed by the Kafka Producer&#8217;s partitioner - including by <tt class="docutils literal"><span class="pre">key</span></tt> if specified (or by a
+partitioner specified by <tt class="docutils literal"><span class="pre">kafka.partitioner.class</span></tt>).</td>
+</tr>
+<tr class="row-even"><td>partitionIdHeader</td>
+<td>&#8211;</td>
+<td>When set, the producer will take the value of the field named using the value of this property
+from the event header and send the message to the specified partition of the topic. If the
+value represents an invalid partition the event will not be accepted into the channel. If the header value
+is present then this setting overrides <tt class="docutils literal"><span class="pre">defaultPartitionId</span></tt>.</td>
+</tr>
+<tr class="row-odd"><td>kafka.consumer.auto.offset.reset</td>
+<td>latest</td>
+<td>What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server
+(e.g. because that data has been deleted):
+earliest: automatically reset the offset to the earliest offset
+latest: automatically reset the offset to the latest offset
+none: throw exception to the consumer if no previous offset is found for the consumer&#8217;s group
+anything else: throw exception to the consumer.</td>
+</tr>
+<tr class="row-even"><td>kafka.producer.security.protocol</td>
+<td>PLAINTEXT</td>
+<td>Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.</td>
+</tr>
+<tr class="row-odd"><td>kafka.consumer.security.protocol</td>
+<td>PLAINTEXT</td>
+<td>Same as kafka.producer.security.protocol but for reading/consuming from Kafka.</td>
+</tr>
+<tr class="row-even"><td><em>more producer/consumer security props</em></td>
+<td>&nbsp;</td>
+<td>If using SASL_PLAINTEXT, SASL_SSL or SSL refer to <a class="reference external" href="http://kafka.apache.org/documentation.html#security">Kafka security</a> for additional
+properties that need to be set on producer/consumer.</td>
+</tr>
+</tbody>
+</table>
+<p>Deprecated Properties</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="19%" />
+<col width="15%" />
+<col width="66%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td>brokerList</td>
+<td>&#8211;</td>
+<td>List of brokers in the Kafka cluster used by the channel
+This can be a partial list of brokers, but we recommend at least two for HA.
+The format is comma separated list of hostname:port</td>
+</tr>
+<tr class="row-odd"><td>topic</td>
+<td>flume-channel</td>
+<td>Use kafka.topic</td>
+</tr>
+<tr class="row-even"><td>groupId</td>
+<td>flume</td>
+<td>Use kafka.consumer.group.id</td>
+</tr>
+<tr class="row-odd"><td>readSmallestOffset</td>
 <td>false</td>
-<td>When set to true, the channel will read all data in the topic, starting from the oldest event
-when false, it will read only events written after the channel started
-When &#8220;parseAsFlumeEvent&#8221; is true, this will be false. Flume source will start prior to the sinks and this
-guarantees that events sent by source before sinks start will not be lost.</td>
-</tr>
-<tr class="row-odd"><td>Other Kafka Properties</td>
-<td>&#8211;</td>
-<td>These properties are used to configure the Kafka Producer and Consumer used by the channel.
-Any property supported by Kafka can be used.
-The only requirement is to prepend the property name with the prefix <tt class="docutils literal"><span class="pre">kafka.</span></tt>.
-For example: kafka.producer.type</td>
+<td>Use kafka.consumer.auto.offset.reset</td>
 </tr>
 </tbody>
 </table>
@@ -4174,12 +4776,135 @@ For example: kafka.producer.type</td>
 <p class="last">Due to the way the channel is load balanced, there may be duplicate events when the agent first starts up</p>
 </div>
 <p>Example for agent named a1:</p>
-<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.type</span>   <span class="o">=</span> <span class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
-<span class="na">a1.channels.channel1.capacity</span> <span class="o">=</span> <span class="s">10000</span>
-<span class="na">a1.channels.channel1.transactionCapacity</span> <span class="o">=</span> <span class="s">1000</span>
-<span class="na">a1.channels.channel1.brokerList</span><span class="o">=</span><span class="s">kafka-2:9092,kafka-3:9092</span>
-<span class="na">a1.channels.channel1.topic</span><span class="o">=</span><span class="s">channel1</span>
-<span class="na">a1.channels.channel1.zookeeperConnect</span><span class="o">=</span><span class="s">kafka-1:2181</span>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">kafka-1:9092,kafka-2:9092,kafka-3:9092</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span class="o">=</span> <span class="s">flume-consumer</span>
+</pre></div>
+</div>
+<p><strong>Security and Kafka Channel:</strong></p>
+<p>Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.</p>
+<p>As of now data encryption is solely provided by SSL/TLS.</p>
+<p>Setting <tt class="docutils literal"><span class="pre">kafka.producer|consumer.security.protocol</span></tt> to any of the following value means:</p>
+<ul class="simple">
+<li><strong>SASL_PLAINTEXT</strong> - Kerberos or plaintext authentication with no data encryption</li>
+<li><strong>SASL_SSL</strong> - Kerberos or plaintext authentication with data encryption</li>
+<li><strong>SSL</strong> - TLS based encryption with optional authentication.</li>
+</ul>
+<div class="admonition warning">
+<p class="first admonition-title">Warning</p>
+<p class="last">There is a performance degradation when SSL is enabled,
+the magnitude of which depends on the CPU type and the JVM implementation.
+Reference: <a class="reference external" href="http://kafka.apache.org/documentation#security_overview">Kafka security overview</a>
+and the jira for tracking this issue:
+<a class="reference external" href="https://issues.apache.org/jira/browse/KAFKA-2561">KAFKA-2561</a></p>
+</div>
+<p><strong>TLS and Kafka Channel:</strong></p>
+<p>Please read the steps described in <a class="reference external" href="http://kafka.apache.org/documentation#security_configclients">Configuring Kafka Clients SSL</a>
+to learn about additional configuration settings for fine tuning for example any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore types.</p>
+<p>Example configuration with server side authentication and data encryption.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span> <span class="o">=</span> <span class="s">SSL</span>
+<span class="na">a1.channels.channel1.kafka.producer.ssl.truststore.location</span> <span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span class="na">a1.channels.channel1.kafka.producer.ssl.truststore.password</span> <span class="o">=</span> <span class="s">&lt;password to access the truststore&gt;</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span> <span class="o">=</span> <span class="s">SSL</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.location</span> <span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.password</span> <span class="o">=</span> <span class="s">&lt;password to access the truststore&gt;</span>
+</pre></div>
+</div>
+<p>Note: By default the property <tt class="docutils literal"><span class="pre">ssl.endpoint.identification.algorithm</span></tt>
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm</span> <span class="o">=</span> <span class="s">HTTPS</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm</span> <span class="o">=</span> <span class="s">HTTPS</span>
+</pre></div>
+</div>
+<p>Once enabled, clients will verify the server&#8217;s fully qualified domain name (FQDN)
+against one of the following two fields:</p>
+<ol class="arabic simple">
+<li>Common Name (CN) <a class="reference external" href="https://tools.ietf.org/html/rfc6125#section-2.3">https://tools.ietf.org/html/rfc6125#section-2.3</a></li>
+<li>Subject Alternative Name (SAN) <a class="reference external" href="https://tools.ietf.org/html/rfc5280#section-4.2.1.6">https://tools.ietf.org/html/rfc5280#section-4.2.1.6</a></li>
+</ol>
+<p>If client side authentication is also required then additionally the following should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either
+individually or by their signature chain. Common example is to sign each client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.kafka.producer.ssl.keystore.location</span> <span class="o">=</span> <span class="s">/path/to/client.keystore.jks</span>
+<span class="na">a1.channels.channel1.kafka.producer.ssl.keystore.password</span> <span class="o">=</span> <span class="s">&lt;password to access the keystore&gt;</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.keystore.location</span> <span class="o">=</span> <span class="s">/path/to/client.keystore.jks</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.keystore.password</span> <span class="o">=</span> <span class="s">&lt;password to access the keystore&gt;</span>
+</pre></div>
+</div>
+<p>If keystore and key use different password protection then <tt class="docutils literal"><span class="pre">ssl.key.password</span></tt> property will
+provide the required additional secret for both consumer and producer keystores:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.kafka.producer.ssl.key.password</span> <span class="o">=</span> <span class="s">&lt;password to access the key&gt;</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.key.password</span> <span class="o">=</span> <span class="s">&lt;password to access the key&gt;</span>
+</pre></div>
+</div>
+<p><strong>Kerberos and Kafka Channel:</strong></p>
+<p>To use Kafka channel with a Kafka cluster secured with Kerberos, set the <tt class="docutils literal"><span class="pre">producer/consumer.security.protocol</span></tt> properties noted above for producer and/or consumer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file&#8217;s &#8220;KafkaClient&#8221; section. &#8220;Client&#8221; section describes the Zookeeper connection if needed.
+See <a class="reference external" href="http://kafka.apache.org/documentation.html#security_sasl_clientconfig">Kafka doc</a>
+for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">JAVA_OPTS</span><span class="o">=</span><span class="s">&quot;$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf&quot;</span>
+<span class="na">JAVA_OPTS</span><span class="o">=</span><span class="s">&quot;$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf&quot;</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_PLAINTEXT:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span> <span class="o">=</span> <span class="s">SASL_PLAINTEXT</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.mechanism</span> <span class="o">=</span> <span class="s">GSSAPI</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.kerberos.service.name</span> <span class="o">=</span> <span class="s">kafka</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span> <span class="o">=</span> <span class="s">SASL_PLAINTEXT</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.mechanism</span> <span class="o">=</span> <span class="s">GSSAPI</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name</span> <span class="o">=</span> <span class="s">kafka</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_SSL:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span> <span class="o">=</span> <span class="s">SASL_SSL</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.mechanism</span> <span class="o">=</span> <span class="s">GSSAPI</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.kerberos.service.name</span> <span class="o">=</span> <span class="s">kafka</span>
+<span class="na">a1.channels.channel1.kafka.producer.ssl.truststore.location</span> <span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span class="na">a1.channels.channel1.kafka.producer.ssl.truststore.password</span> <span class="o">=</span> <span class="s">&lt;password to access the truststore&gt;</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span> <span class="o">=</span> <span class="s">SASL_SSL</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.mechanism</span> <span class="o">=</span> <span class="s">GSSAPI</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name</span> <span class="o">=</span> <span class="s">kafka</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.location</span> <span class="o">=</span> <span class="s">/path/to/truststore.jks</span>

[... 193 lines stripped ...]



Mime
View raw message