flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bes...@apache.org
Subject flume git commit: FLUME-2971. Add secure Kafka Sink/Source/Channel setup to the User Guide
Date Mon, 10 Oct 2016 18:49:07 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 585c4c92e -> 2fd0d2572


FLUME-2971. Add secure Kafka Sink/Source/Channel setup to the User Guide

The User Guide already has details about configuring Kafka channel to work with a kerberized
Kafka cluster.

This patch adds similar description for Kafka Sink and Kafka Source.

Reviewers: Tristan Stevens, Mike Percy, Bessenyei Balázs Donát

(Attila Simon via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2fd0d257
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2fd0d257
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2fd0d257

Branch: refs/heads/trunk
Commit: 2fd0d2572ceb2bc0138c880a3b763647349b64f4
Parents: 585c4c9
Author: Attila Simon <sati@cloudera.com>
Authored: Mon Oct 10 20:48:06 2016 +0200
Committer: Bessenyei Balázs Donát <bessbd@apache.org>
Committed: Mon Oct 10 20:48:06 2016 +0200

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst | 563 +++++++++++++++++++++++-----
 1 file changed, 471 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2fd0d257/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 25db777..71fd32e 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1240,44 +1240,45 @@ Kafka Source is an Apache Kafka consumer that reads messages from
Kafka topics.
 If you have multiple Kafka sources running, you can configure them with the same Consumer
Group
 so each will read a unique set of partitions for the topics.
 
-
-
-===============================  ===========  ===================================================
-Property Name                    Default      Description
-===============================  ===========  ===================================================
-**channels**                     --
-**type**                         --           The component type name, needs to be ``org.apache.flume.source.kafka.KafkaSource``
-**kafka.bootstrap.servers**      --           List of brokers in the Kafka cluster used by
the source
-kafka.consumer.group.id          flume        Unique identified of consumer group. Setting
the same id in multiple sources or agents
-                                              indicates that they are part of the same consumer
group
-**kafka.topics**                 --           Comma-separated list of topics the kafka consumer
will read messages from.
-**kafka.topics.regex**           --           Regex that defines set of topics the source
is subscribed on. This property has higher priority
-                                              than ``kafka.topics`` and overrides ``kafka.topics``
if exists.
-batchSize                        1000         Maximum number of messages written to Channel
in one batch
-batchDurationMillis              1000         Maximum time (in ms) before a batch will be
written to Channel
-                                              The batch will be written whenever the first
of size and time will be reached.
-backoffSleepIncrement            1000         Initial and incremental wait time that is triggered
when a Kafka Topic appears to be empty.
-                                              Wait period will reduce aggressive pinging
of an empty Kafka Topic.  One second is ideal for
-                                              ingestion use cases but a lower value may be
required for low latency operations with
-                                              interceptors.
-maxBackoffSleep                  5000         Maximum wait time that is triggered when a
Kafka Topic appears to be empty.  Five seconds is
-                                              ideal for ingestion use cases but a lower value
may be required for low latency operations
-                                              with interceptors.
-useFlumeEventFormat              false        By default events are taken as bytes from the
Kafka topic directly into the event body. Set to
-                                              true to read events as the Flume Avro binary
format. Used in conjunction with the same property
-                                              on the KafkaSink or with the parseAsFlumeEvent
property on the Kafka Channel this will preserve
-                                              any Flume headers sent on the producing side.
-migrateZookeeperOffsets          true         When no Kafka stored offset is found, look
up the offsets in Zookeeper and commit them to Kafka.
-                                              This should be true to support seamless Kafka
client migration from older versions of Flume.
-                                              Once migrated this can be set to false, though
that should generally not be required.
-                                              If no Zookeeper offset is found, the Kafka
configuration kafka.consumer.auto.offset.reset
-                                              defines how offsets are handled.
-Other Kafka Consumer Properties  --           These properties are used to configure the
Kafka Consumer. Any producer property supported
-                                              by Kafka can be used. The only requirement
is to prepend the property name with the prefix
-                                              ``kafka.consumer``.
-                                              For example: ``kafka.consumer.auto.offset.reset``
-                                              Check `Kafka documentation <http://kafka.apache.org/documentation.html#newconsumerconfigs>`_
for details
-===============================  ===========  ===================================================
+==================================  ===========  ===================================================
+Property Name                       Default      Description
+==================================  ===========  ===================================================
+**channels**                        --
+**type**                            --           The component type name, needs to be ``org.apache.flume.source.kafka.KafkaSource``
+**kafka.bootstrap.servers**         --           List of brokers in the Kafka cluster used
by the source
+kafka.consumer.group.id             flume        Unique identified of consumer group. Setting
the same id in multiple sources or agents
+                                                 indicates that they are part of the same
consumer group
+**kafka.topics**                    --           Comma-separated list of topics the kafka
consumer will read messages from.
+**kafka.topics.regex**              --           Regex that defines set of topics the source
is subscribed on. This property has higher priority
+                                                 than ``kafka.topics`` and overrides ``kafka.topics``
if exists.
+batchSize                           1000         Maximum number of messages written to Channel
in one batch
+batchDurationMillis                 1000         Maximum time (in ms) before a batch will
be written to Channel
+                                                 The batch will be written whenever the first
of size and time will be reached.
+backoffSleepIncrement               1000         Initial and incremental wait time that is
triggered when a Kafka Topic appears to be empty.
+                                                 Wait period will reduce aggressive pinging
of an empty Kafka Topic.  One second is ideal for
+                                                 ingestion use cases but a lower value may
be required for low latency operations with
+                                                 interceptors.
+maxBackoffSleep                     5000         Maximum wait time that is triggered when
a Kafka Topic appears to be empty.  Five seconds is
+                                                 ideal for ingestion use cases but a lower
value may be required for low latency operations
+                                                 with interceptors.
+useFlumeEventFormat                 false        By default events are taken as bytes from
the Kafka topic directly into the event body. Set to
+                                                 true to read events as the Flume Avro binary
format. Used in conjunction with the same property
+                                                 on the KafkaSink or with the parseAsFlumeEvent
property on the Kafka Channel this will preserve
+                                                 any Flume headers sent on the producing
side.
+migrateZookeeperOffsets             true         When no Kafka stored offset is found, look
up the offsets in Zookeeper and commit them to Kafka.
+                                                 This should be true to support seamless
Kafka client migration from older versions of Flume.
+                                                 Once migrated this can be set to false,
though that should generally not be required.
+                                                 If no Zookeeper offset is found, the Kafka
configuration kafka.consumer.auto.offset.reset
+                                                 defines how offsets are handled.
+                                                 Check `Kafka documentation <http://kafka.apache.org/documentation.html#newconsumerconfigs>`_
for details
+kafka.consumer.security.protocol    PLAINTEXT    Set to SASL_PLAINTEXT, SASL_SSL or SSL if
writing to Kafka using some level of security. See below for additional info on secure setup.
+*more consumer security props*                   If using SASL_PLAINTEXT, SASL_SSL or SSL
refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_ for
additional
+                                                 properties that need to be set on consumer.
+Other Kafka Consumer Properties     --           These properties are used to configure the
Kafka Consumer. Any consumer property supported
+                                                 by Kafka can be used. The only requirement
is to prepend the property name with the prefix
+                                                 ``kafka.consumer``.
+                                                 For example: ``kafka.consumer.auto.offset.reset``
+==================================  ===========  ===================================================
 
 .. note:: The Kafka Source overrides two Kafka consumer parameters:
           auto.commit.enable is set to "false" by the source and every batch is committed.
Kafka source guarantees at least once
@@ -1319,6 +1320,142 @@ Example for topic subscription by regex
     # the default kafka.consumer.group.id=flume is used
 
 
+**Security and Kafka Source:**
+
+Secure authentication as well as data encryption is supported on the communication channel
between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is
named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.
+
+As of now data encryption is solely provided by SSL/TLS.
+
+Setting ``kafka.consumer.security.protocol`` to any of the following value means:
+
+- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data encryption
+- **SASL_SSL** - Kerberos or plaintext authentication with data encryption
+- **SSL** - TLS based encryption with optional authentication.
+
+.. warning::
+    There is a performance degradation when SSL is enabled,
+    the magnitude of which depends on the CPU type and the JVM implementation.
+    Reference: `Kafka security overview <http://kafka.apache.org/documentation#security_overview>`_
+    and the jira for tracking this issue:
+    `KAFKA-2561 <https://issues.apache.org/jira/browse/KAFKA-2561>`_
+
+
+**TLS and Kafka Source:**
+
+Please read the steps described in `Configuring Kafka Clients SSL <http://kafka.apache.org/documentation#security_configclients>`_
+to learn about additional configuration settings for fine tuning for example any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore types.
+
+Example configuration with server side authentication and data encryption.
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.consumer.security.protocol = SSL
+    a1.channels.channel1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
+    a1.channels.channel1.kafka.consumer.ssl.truststore.password=<password to access the
truststore>
+
+
+Note: By default the property ``ssl.endpoint.identification.algorithm``
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
+
+Once enabled, clients will verify the server's fully qualified domain name (FQDN)
+against one of the following two fields:
+
+#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
+#) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
+
+If client side authentication is also required then additionally the following should be
added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers
either
+individually or by their signature chain. Common example is to sign each client certificate
by a single Root CA
+which in turn is trusted by Kafka brokers.
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
+    a1.channels.channel1.kafka.consumer.ssl.keystore.password=<password to access the
keystore>
+
+If keystore and key use different password protection then ``ssl.key.password`` property
will
+provide the required additional secret for both consumer keystores:
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.consumer.ssl.key.password=<password to access the key>
+
+
+**Kerberos and Kafka Source:**
+
+To use Kafka source with a Kafka cluster secured with Kerberos, set the ``consumer.security.protocol``
properties noted above for consumer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file's
"KafkaClient" section. "Client" section describes the Zookeeper connection if needed.
+See `Kafka doc <http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_
+for information on the JAAS file contents. The location of this JAAS file and optionally
the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:
+
+.. code-block:: properties
+
+    JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
+    JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
+
+Example secure configuration using SASL_PLAINTEXT:
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
+    a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
+
+Example secure configuration using SASL_SSL:
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
+    a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
+    a1.channels.channel1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
+    a1.channels.channel1.kafka.consumer.ssl.truststore.password=<password to access the
truststore>
+
+
+Sample JAAS file. For reference of its content please see client config sections of the desired
authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of `SASL configuration <http://kafka.apache.org/documentation#security_sasl_clientconfig>`_.
+Since the Kafka Source may also connect to Zookeeper for offset migration, the "Client" section
was also added to this example.
+This won't be needed unless you require offset migration, or you require this section for
other secure components.
+Also please make sure that the operating system user of the Flume processes has read privileges
on the jaas and keytab files.
+
+.. code-block:: javascript
+
+    Client {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
+
+    KafkaClient {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
+
+
 NetCat Source
 ~~~~~~~~~~~~~
 
@@ -2564,38 +2701,41 @@ This version of Flume no longer supports Older Versions (0.8.x) of
Kafka.
 Required properties are marked in bold font.
 
 
-===============================  ===================  =============================================================================================
-Property Name                    Default              Description
-===============================  ===================  =============================================================================================
-**type**                         --                   Must be set to ``org.apache.flume.sink.kafka.KafkaSink``
-**kafka.bootstrap.servers**      --                   List of brokers Kafka-Sink will connect
to, to get the list of topic partitions
-                                                      This can be a partial list of brokers,
but we recommend at least two for HA.
-                                                      The format is comma separated list
of hostname:port
-kafka.topic                      default-flume-topic  The topic in Kafka to which the messages
will be published. If this parameter is configured,
-                                                      messages will be published to this
topic.
-                                                      If the event header contains a "topic"
field, the event will be published to that topic
-                                                      overriding the topic configured here.
-flumeBatchSize                   100                  How many messages to process in one
batch. Larger batches improve throughput while adding latency.
-kafka.producer.acks              1                    How many replicas must acknowledge
a message before its considered successfully written.
-                                                      Accepted values are 0 (Never wait for
acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)
-                                                      Set this to -1 to avoid data loss in
some cases of leader failure.
-useFlumeEventFormat              false                By default events are put as bytes
onto the Kafka topic directly from the event body. Set to
-                                                      true to store events as the Flume Avro
binary format. Used in conjunction with the same property
-                                                      on the KafkaSource or with the parseAsFlumeEvent
property on the Kafka Channel this will preserve
-                                                      any Flume headers for the producing
side.
-defaultPartitionId               --                   Specifies a Kafka partition ID (integer)
for all events in this channel to be sent to, unless
-                                                      overriden by ``partitionIdHeader``.
By default, if this property is not set, events will be
-                                                      distributed by the Kafka Producer's
partitioner - including by ``key`` if specified (or by a
-                                                      partitioner specified by ``kafka.partitioner.class``).
-partitionIdHeader                --                   When set, the sink will take the value
of the field named using the value of this property
-                                                      from the event header and send the
message to the specified partition of the topic. If the
-                                                      value represents an invalid partition,
an EventDeliveryException will be thrown. If the header value
-                                                      is present then this setting overrides
``defaultPartitionId``.
-Other Kafka Producer Properties  --                   These properties are used to configure
the Kafka Producer. Any producer property supported
-                                                      by Kafka can be used. The only requirement
is to prepend the property name with the prefix
-                                                      ``kafka.producer``.
-                                                      For example: kafka.producer.linger.ms
-===============================  ===================  =============================================================================================
+==================================  ===================  =============================================================================================
+Property Name                       Default              Description
+==================================  ===================  =============================================================================================
+**type**                            --                   Must be set to ``org.apache.flume.sink.kafka.KafkaSink``
+**kafka.bootstrap.servers**         --                   List of brokers Kafka-Sink will
connect to, to get the list of topic partitions
+                                                         This can be a partial list of brokers,
but we recommend at least two for HA.
+                                                         The format is comma separated list
of hostname:port
+kafka.topic                         default-flume-topic  The topic in Kafka to which the
messages will be published. If this parameter is configured,
+                                                         messages will be published to this
topic.
+                                                         If the event header contains a "topic"
field, the event will be published to that topic
+                                                         overriding the topic configured
here.
+flumeBatchSize                      100                  How many messages to process in
one batch. Larger batches improve throughput while adding latency.
+kafka.producer.acks                 1                    How many replicas must acknowledge
a message before its considered successfully written.
+                                                         Accepted values are 0 (Never wait
for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)
+                                                         Set this to -1 to avoid data loss
in some cases of leader failure.
+useFlumeEventFormat                 false                By default events are put as bytes
onto the Kafka topic directly from the event body. Set to
+                                                         true to store events as the Flume
Avro binary format. Used in conjunction with the same property
+                                                         on the KafkaSource or with the parseAsFlumeEvent
property on the Kafka Channel this will preserve
+                                                         any Flume headers for the producing
side.
+defaultPartitionId                  --                   Specifies a Kafka partition ID (integer)
for all events in this channel to be sent to, unless
+                                                         overriden by ``partitionIdHeader``.
By default, if this property is not set, events will be
+                                                         distributed by the Kafka Producer's
partitioner - including by ``key`` if specified (or by a
+                                                         partitioner specified by ``kafka.partitioner.class``).
+partitionIdHeader                   --                   When set, the sink will take the
value of the field named using the value of this property
+                                                         from the event header and send the
message to the specified partition of the topic. If the
+                                                         value represents an invalid partition,
an EventDeliveryException will be thrown. If the header value
+                                                         is present then this setting overrides
``defaultPartitionId``.
+kafka.producer.security.protocol    PLAINTEXT            Set to SASL_PLAINTEXT, SASL_SSL
or SSL if writing to Kafka using some level of security. See below for additional info on
secure setup.
+*more producer security props*                           If using SASL_PLAINTEXT, SASL_SSL
or SSL refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_
for additional
+                                                         properties that need to be set on
producer.
+Other Kafka Producer Properties     --                   These properties are used to configure
the Kafka Producer. Any producer property supported
+                                                         by Kafka can be used. The only requirement
is to prepend the property name with the prefix
+                                                         ``kafka.producer``.
+                                                         For example: kafka.producer.linger.ms
+==================================  ===================  =============================================================================================
 
 .. note::   Kafka Sink uses the ``topic`` and ``key`` properties from the FlumeEvent headers
to send events to Kafka.
             If ``topic`` exists in the headers, the event will be sent to that specific topic,
overriding the topic configured for the Sink.
@@ -2635,6 +2775,132 @@ argument.
     a1.sinks.k1.kafka.producer.linger.ms = 1
     a1.sinks.ki.kafka.producer.compression.type = snappy
 
+
+**Security and Kafka Sink:**
+
+Secure authentication as well as data encryption is supported on the communication channel
between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is
named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.
+
+As of now data encryption is solely provided by SSL/TLS.
+
+Setting ``kafka.producer.security.protocol`` to any of the following value means:
+
+- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data encryption
+- **SASL_SSL** - Kerberos or plaintext authentication with data encryption
+- **SSL** - TLS based encryption with optional authentication.
+
+.. warning::
+    There is a performance degradation when SSL is enabled,
+    the magnitude of which depends on the CPU type and the JVM implementation.
+    Reference: `Kafka security overview <http://kafka.apache.org/documentation#security_overview>`_
+    and the jira for tracking this issue:
+    `KAFKA-2561 <https://issues.apache.org/jira/browse/KAFKA-2561>`__
+
+
+**TLS and Kafka Sink:**
+
+Please read the steps described in `Configuring Kafka Clients SSL <http://kafka.apache.org/documentation#security_configclients>`_
+to learn about additional configuration settings for fine tuning for example any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore types.
+
+Example configuration with server side authentication and data encryption.
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.producer.security.protocol = SSL
+    a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
+    a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access
the truststore>
+
+
+Note: By default the property ``ssl.endpoint.identification.algorithm``
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
+
+Once enabled, clients will verify the server's fully qualified domain name (FQDN)
+against one of the following two fields:
+
+#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
+#) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
+
+If client side authentication is also required then additionally the following should be
added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers
either
+individually or by their signature chain. Common example is to sign each client certificate
by a single Root CA
+which in turn is trusted by Kafka brokers.
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
+    a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the
keystore>
+
+If keystore and key use different password protection then ``ssl.key.password`` property
will
+provide the required additional secret for producer keystore:
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
+
+
+**Kerberos and Kafka Sink:**
+
+To use Kafka sink with a Kafka cluster secured with Kerberos, set the ``producer.security.protocol``
property noted above for producer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file's
"KafkaClient" section. "Client" section describes the Zookeeper connection if needed.
+See `Kafka doc <http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_
+for information on the JAAS file contents. The location of this JAAS file and optionally
the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:
+
+.. code-block:: properties
+
+    JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
+    JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
+
+Example secure configuration using SASL_PLAINTEXT:
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
+    a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
+
+
+Example secure configuration using SASL_SSL:
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
+    a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
+    a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
+    a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access
the truststore>
+
+
+Sample JAAS file. For reference of its content please see client config sections of the desired
authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of `SASL configuration <http://kafka.apache.org/documentation#security_sasl_clientconfig>`_.
+Unlike the Kafka Source or Kafka Channel a "Client" section is not required, unless it is
needed by other connecting components. Also please make sure
+that the operating system user of the Flume processes has read privileges on the jaas and
keytab files.
+
+.. code-block:: javascript
+
+    KafkaClient {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
+
+
 Custom Sink
 ~~~~~~~~~~~
 
@@ -2792,7 +3058,7 @@ migrateZookeeperOffsets                  true                      
 When no Kaf
                                                                      This should be true
to support seamless Kafka client migration from older versions of Flume. Once migrated this
can be set
                                                                      to false, though that
should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset
                                                                      configuration defines
how offsets are handled.
-pollTimeout                              500                         The amount of time(in
milliseconds) to wait in the "poll()" call of the conumer.
+pollTimeout                              500                         The amount of time(in
milliseconds) to wait in the "poll()" call of the consumer.
                                                                      https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
 defaultPartitionId                       --                          Specifies a Kafka partition
ID (integer) for all events in this channel to be sent to, unless
                                                                      overriden by ``partitionIdHeader``.
By default, if this property is not set, events will be
@@ -2808,9 +3074,9 @@ kafka.consumer.auto.offset.reset         latest                    
 What to do
                                                                      latest: automatically
reset the offset to the latest offset
                                                                      none: throw exception
to the consumer if no previous offset is found for the consumer\'s group
                                                                      anything else: throw
exception to the consumer.
-kafka.producer.security.protocol         PLAINTEXT                   Set to SASL_PLAINTEXT,
SASL_SSL or SSL if writing to Kafka using Kerberos. See below for additional info on Kerberos
setup.
+kafka.producer.security.protocol         PLAINTEXT                   Set to SASL_PLAINTEXT,
SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional
info on secure setup.
 kafka.consumer.security.protocol         PLAINTEXT                   Same as kafka.producer.security.protocol
but for reading/consuming from Kafka.
-*more producer/consumer security props*                              If using SASL_SSL or
SSL, refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_
for additional
+*more producer/consumer security props*                              If using SASL_PLAINTEXT,
SASL_SSL or SSL refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_
for additional
                                                                      properties that need
to be set on producer/consumer.
 =======================================  ==========================  ===============================================================================================================
 
@@ -2839,43 +3105,156 @@ Example for agent named a1:
     a1.channels.channel1.kafka.topic = channel1
     a1.channels.channel1.kafka.consumer.group.id = flume-consumer
 
+
+**Security and Kafka Channel:**
+
+Secure authentication as well as data encryption is supported on the communication channel
between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is
named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.
+
+As of now data encryption is solely provided by SSL/TLS.
+
+Setting ``kafka.producer|consumer.security.protocol`` to any of the following value means:
+
+- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data encryption
+- **SASL_SSL** - Kerberos or plaintext authentication with data encryption
+- **SSL** - TLS based encryption with optional authentication.
+
+.. warning::
+    There is a performance degradation when SSL is enabled,
+    the magnitude of which depends on the CPU type and the JVM implementation.
+    Reference: `Kafka security overview <http://kafka.apache.org/documentation#security_overview>`_
+    and the jira for tracking this issue:
+    `KAFKA-2561 <https://issues.apache.org/jira/browse/KAFKA-2561>`_
+
+
+**TLS and Kafka Channel:**
+
+Please read the steps described in `Configuring Kafka Clients SSL <http://kafka.apache.org/documentation#security_configclients>`_
+to learn about additional configuration settings for fine tuning for example any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore types.
+
+Example configuration with server side authentication and data encryption.
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.producer.security.protocol = SSL
+    a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
+    a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access
the truststore>
+    a1.channels.channel1.kafka.consumer.security.protocol = SSL
+    a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
+    a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access
the truststore>
+
+
+Note: By default the property ``ssl.endpoint.identification.algorithm``
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
+    a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS
+
+Once enabled, clients will verify the server's fully qualified domain name (FQDN)
+against one of the following two fields:
+
+#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
+#) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
+
+If client side authentication is also required then additionally the following should be
added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers
either
+individually or by their signature chain. Common example is to sign each client certificate
by a single Root CA
+which in turn is trusted by Kafka brokers.
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
+    a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the
keystore>
+    a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
+    a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the
keystore>
+
+If keystore and key use different password protection then ``ssl.key.password`` property
will
+provide the required additional secret for both consumer and producer keystores:
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
+    a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>
+
+
 **Kerberos and Kafka Channel:**
 
-To use Kafka channel with a Kafka cluster secured with Kerberos, set the producer/consumer.security.protocol
properties noted above for producer and/or consumer.
-The Kerberos keytab and principal to be used is specified in a JAAS file's "KafkaClient"
section. See `Kafka doc <http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_
-for info on the JAAS file contents. The location of this JAAS file is specified via JAVA_OPTS
using -Djava.security.auth.login.config=/path/to/kafka_jaas.conf (in flume-env.sh)
+To use Kafka channel with a Kafka cluster secured with Kerberos, set the ``producer/consumer.security.protocol``
properties noted above for producer and/or consumer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file's
"KafkaClient" section. "Client" section describes the Zookeeper connection if needed.
+See `Kafka doc <http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_
+for information on the JAAS file contents. The location of this JAAS file and optionally
the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:
+
+.. code-block:: properties
 
+    JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
+    JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
 
-Sample secure configuration using SASL_PLAINTEXT.
+Example secure configuration using SASL_PLAINTEXT:
 
 .. code-block:: properties
 
     a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
-    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
+    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
     a1.channels.channel1.kafka.topic = channel1
     a1.channels.channel1.kafka.consumer.group.id = flume-consumer
     a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
+    a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
     a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
+    a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
 
-Sample JAAS file
+Example secure configuration using SASL_SSL:
 
-.. code-block:: javascript
+.. code-block:: properties
 
-    KafkaClient {
-        com.sun.security.auth.module.Krb5LoginModule required
-        useKeyTab=true
-        storeKey=true
-        serviceName="kafka"
-        keyTab="/path/to/keytabs/testuser1.keytab"
-        principal="testuser1/kafka1.example.com";
-    };
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
+    a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
+    a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
+    a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access
the truststore>
+    a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
+    a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
+    a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
+    a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access
the truststore>
+
+
+Sample JAAS file. For reference of its content please see client config sections of the desired
authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of `SASL configuration <http://kafka.apache.org/documentation#security_sasl_clientconfig>`_.
+Since the Kafka Source may also connect to Zookeeper for offset migration, the "Client" section
was also added to this example.
+This won't be needed unless you require offset migration, or you require this section for
other secure components.
+Also please make sure that the operating system user of the Flume processes has read privileges
on the jaas and keytab files.
 
-Sample flume-env.sh
+.. code-block:: javascript
 
-.. code-block:: properties
+    Client {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
 
-    export JAVA_HOME=/path/java-home/
-    export JAVA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_jaas.conf"
+    KafkaClient {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
 
 
 File Channel


Mime
View raw message