kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5455; Better Javadocs for the transactional producer and consumer
Date Sat, 17 Jun 2017 21:56:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 d083a156e -> 5b3512166


KAFKA-5455; Better Javadocs for the transactional producer and consumer

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #3353 from apurvam/KAFKA-5455-proper-javadocs-eos-clients

(cherry picked from commit b836bd18f99e32ee71e30693853518fdc2061218)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b351216
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b351216
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b351216

Branch: refs/heads/0.11.0
Commit: 5b351216621f52a471c21826d0dec3ce3187e697
Parents: d083a15
Author: Apurva Mehta <apurva@confluent.io>
Authored: Sat Jun 17 14:56:24 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Sat Jun 17 14:56:39 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  35 ++++++
 .../kafka/clients/producer/KafkaProducer.java   | 113 ++++++++++++++++++-
 2 files changed, 143 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5b351216/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1d5ff98..155f2e0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -418,6 +418,33 @@ import java.util.regex.Pattern;
  * to pause the consumption on the specified assigned partitions and resume the consumption
  * on the specified paused partitions respectively in the future {@link #poll(long)} calls.
  *
+ * <h3>Reading Transactional Messages</h3>
+ *
+ * <p>
+ * Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple
topics and partitions atomically.
+ * In order for this to work, consumers reading from these partitions should be configured
to only read committed data.
+ * This can be achieved by by setting the <code>isolation.level=read_committed</code>
in the consumer's configuration.
+ * </p>
+ *
+ * <p>
+ * In <code>read_committed</code> mode, the consumer will read only those transactional
messages which have been
+ * successfully committed. It will continue to read non-transactional messages as before.
There is no client-side
+ * buffering in <code>read_committed</code> mode. Instead, the end offset of
a partition for a <code>read_committed</code>
+ * consumer would be the offset of the first message in the partition belonging to an open
transaction. This offset
+ * is known as the 'Last Stable Offset'(LSO).</p>
+ *
+ * <p>A </p><code>read_committed</code> consumer will only read up
till the LSO and filter out any transactional
+ * messages which have been aborted. The LSO also affects the behavior of {@link #seekToEnd(Collection)}
and
+ * {@link #endOffsets(Collection)} for <code>read_committed</code> consumers,
details of which are in each method's documentation.
+ * Finally, the fetch lag metrics are also adjusted to be relative to the LSO for <code>read_committed</code>
consumers.</p>
+ *
+ * <p>Partitions with transactional messages will include commit or abort markers which
indicate the result of a transaction.
+ * There markers are not returned to applications, yet have an offset in the log. As a result,
applications reading from
+ * topics with transactional messages will see gaps in the consumed offsets. These missing
messages would be the transaction
+ * markers, and they are filtered out for consumers in both isolation levels. Additionally,
applications using
+ * <code>read_committed</code> consumers may also see gaps due to aborted transactions,
since those messages would not
+ * be returned by the consumer and yet would have valid offsets.</p>
+ *
  * <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
  *
  * The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
@@ -1247,6 +1274,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * Seek to the last offset for each of the given partitions. This function evaluates
lazily, seeking to the
      * final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)}
are called.
      * If no partition is provided, seek to the final offset for all of the currently assigned
partitions.
+     *
+     * If <code>isolation.level=read_committed</code>, the end offset will be
the Last Stable Offset, ie. the offset
+     * of the first message with an open transaction.
      */
     public void seekToEnd(Collection<TopicPartition> partitions) {
         acquire();
@@ -1489,6 +1519,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * <p>
      * Notice that this method may block indefinitely if the partition does not exist.
      * This method does not change the current consumer position of the partitions.
+     * </p>
+     *
+     * <p>When <code>isolation.level=read_committed</code> the last offset
will be the Last Stable Offset (LSO).
+     * This is the offset of the first message with an open transaction. The LSO moves forward
as transactions
+     * are completed.</p>
      *
      * @see #seekToEnd(Collection)
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b351216/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 679e0ea..11df0e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -96,7 +96,7 @@ import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.e
  * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  *
  * Producer<String, String> producer = new KafkaProducer<>(props);
- * for(int i = 0; i < 100; i++)
+ * for (int i = 0; i < 100; i++)
  *     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i),
Integer.toString(i)));
  *
  * producer.close();
@@ -139,9 +139,88 @@ import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.e
  * their <code>ProducerRecord</code> into bytes. You can use the included {@link
org.apache.kafka.common.serialization.ByteArraySerializer} or
  * {@link org.apache.kafka.common.serialization.StringSerializer} for simple string or byte
types.
  * <p>
+ * From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer
and the transactional producer.
+ * The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly
once delivery. In particular
+ * producer retries will no longer introduce duplicates. The transactional producer allows
an application to send messages
+ * to multiple partitions (and topics!) atomically.
+ * </p>
+ * <p>
+ * To enable idempotence, the <code>enable.idempotence</code> configuration must
be set to true. If set, the
+ * <code>retries</code> config will be defaulted to <code>Integer.MAX_VALUE</code>,
the
+ * <code>max.inflight.requests.per.connection</code> config will be defaulted
to <code>1</code>,
+ * and <code>acks</code> config will be defaulted to <code>all</code>.
There are no API changes for the idempotent
+ * producer, so existing applications will not need to be modified to take advantage of this
feature.
+ * </p>
+ * <p>
+ * To take advantage of the idempotent producer, it is imperative to avoid application level
re-sends since these cannot
+ * be de-duplicated. As such, if an application enables idempotence, it is recommended to
leave the <code>retries</code>
+ * config unset, as it will be defaulted to <code>Integer.MAX_VALUE</code>. Additionally,
if a {@link #send(ProducerRecord)}
+ * returns an error even with infinite retries (for instance if the message expires in the
buffer before being sent),
+ * then it is recommended to shut down the producer and check the contents of the last produced
message to ensure that
+ * it is not duplicated. Finally, the producer can only guarantee idempotence for messages
sent within a single session.
+ * </p>
+ * <p>To use the transactional producer and the attendant APIs, you must set the <code>transactional.id</code>
+ * configuration property. If the <code>transactional.id</code> is set, idempotence
is automatically enabled along with
+ * the producer configs which idempotence depends on. Further, topics which are included
in transactions should be configured
+ * for durability. In particular, the <code>replication.factor</code> should
be at least <code>3</code>, and the
+ * <code>min.insync.replicas</code> for these topics should be set to 2. Finally,
in order for transactional guarantees
+ * to be realized from end-to-end, the consumers must be configured to read only committed
messages as well.
+ * </p>
+ * <p>
+ * The purpose of the <code>transactional.id</code> is to enable transaction
recovery across multiple sessions of a
+ * single producer instance. It would typically be derived from the shard identifier in a
partitioned, stateful, application.
+ * As such, it should be unique to each producer instance running within a partitioned application.
+ * </p>
+ * <p>All the new transactional APIs are blocking and will throw exceptions on failure.
The example
+ * below illustrates how the new APIs are meant to be used. It is similar to the example
above, except that all
+ * 100 messages are part of a single transaction.
+ * </p>
+ * <p>
+ * <pre>
+ * {@code
+ * Properties props = new Properties();
+ * props.put("bootstrap.servers", "localhost:9092");
+ * props.put("transactional.id", "my-transactional-id");
+ * Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(),
new StringSerializer());
+ *
+ * producer.initTransactions();
+ *
+ * try {
+ *     producer.beginTransaction();
+ *     for (int i = 0; i < 100; i++)
+ *         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
+ *     producer.commitTransaction();
+ * } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException
e) {
+ *     // We can't recover from these exceptions, so our only option is to close the producer
and exit.
+ *     producer.close();
+ * } catch (KafkaException e) {
+ *     // For all other exceptions, just abort the transaction and try again.
+ *     producer.abortTransaction();
+ * }
+ * producer.close();
+ * } </pre>
+ * </p>
+ * <p>
+ * As is hinted at in the example, there can be only one open transaction per producer. All
messages sent between the
+ * {@link #beginTransaction()} and {@link #commitTransaction()} calls will be part of a single
transaction. When the
+ * <code>transactional.id</code> is specified, all messages sent by the producer
must be part of a transaction.
+ * </p>
+ * <p>
+ * The transactional producer uses exceptions to communicate error states. In particular,
it is not required
+ * to specify callbacks for <code>producer.send()</code> or to call <code>.get()</code>
on the returned Future: a
+ * <code>KafkaException</code> would be thrown if any of the
+ * <code>producer.send()</code> or transactional calls hit an irrecoverable error
during a transaction. See the {@link #send(ProducerRecord)}
+ * documentation for more details about detecting errors from a transactional send.
+ * </p>
+ * </p>By calling
+ * <code>producer.abortTransaction()</code> upon receiving a <code>KafkaException</code>
we can ensure that any
+ * successful writes are marked as aborted, hence keeping the transactional guarantees.
+ * </p>
+ * <p>
  * This client can communicate with brokers that are version 0.10.0 or newer. Older or newer
brokers may not support
- * certain client features.  You will receive an UnsupportedVersionException when invoking
an API that is not available
- * with the running broker verion.
+ * certain client features.  For instance, the transactional APIs need broker versions 0.11.0
or later. You will receive an
+ * <code>UnsupportedVersionException</code> when invoking an API that is not
available in the running broker version.
+ * </p>
  */
 public class KafkaProducer<K, V> implements Producer<K, V> {
 
@@ -487,7 +566,11 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
     }
 
     /**
-     * Commits the ongoing transaction.
+     * Commits the ongoing transaction. This method will flush any unsent records before
actually committing the transaction.
+     *
+     * Further, if any of the {@link #send(ProducerRecord)} calls which were part of the
transaction hit irrecoverable
+     * errors, this method will throw the last received exception immediately and the transaction
will not be committed.
+     * So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order
for this method to succeed.
      *
      * @throws ProducerFencedException if another producer with the same
      *         transactional.id is active.
@@ -501,7 +584,9 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
     }
 
     /**
-     * Aborts the ongoing transaction.
+     * Aborts the ongoing transaction. Any unflushed produce messages will be aborted when
this call is made.
+     * This call will throw an exception immediately if any prior {@link #send(ProducerRecord)}
calls failed with a
+     * {@link ProducerFencedException} or an instance of {@link org.apache.kafka.common.errors.AuthorizationException}.
      *
      * @throws ProducerFencedException if another producer with the same
      *         transactional.id is active.
@@ -581,6 +666,18 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
      * }
      * </pre>
      * <p>
+     * When used as part of a transaction, it is not necessary to define a callback or check
the result of the future
+     * in order to detect errors from <code>send</code>. If any of the send calls
failed with an irrecoverable error,
+     * the final {@link #commitTransaction()} call will fail and throw the exception from
the last failed send. When
+     * this happens, your application should call {@link #abortTransaction()} to reset the
state and continue to send
+     * data.
+     * </p>
+     * <p>
+     * Some transactional send errors cannot be resolved with a call to {@link #abortTransaction()}.
 In particular,
+     * if a transactional send finishes with a {@link ProducerFencedException}, a {@link
org.apache.kafka.common.errors.OutOfOrderSequenceException},
+     * or any {@link org.apache.kafka.common.errors.AuthorizationException}, then the only
option left is to call {@link #close()}.
+     * </p>
+     * <p>
      * Note that callbacks will generally execute in the I/O thread of the producer and so
should be reasonably fast or
      * they will delay the sending of messages from other threads. If you want to execute
blocking or computationally
      * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor}
in the callback body
@@ -788,6 +885,12 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
      *
      * Note that the above example may drop records if the produce request fails. If we want
to ensure that this does not occur
      * we need to set <code>retries=&lt;large_number&gt;</code> in our
config.
+     * </p>
+     * <p>
+     * Applications don't need to call this method for transactional producers, since the
{@link #commitTransaction()} will
+     * flush all buffered records before performing the commit. This ensures that all the
the {@link #send(ProducerRecord)}
+     * calls made since the previous {@link #beginTransaction()} are completed before the
commit.
+     * </p>
      *
      * @throws InterruptException If the thread is interrupted while blocked
      */


Mime
View raw message