kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: MINOR: Logging/debugging improvements for transactions
Date Fri, 02 Jun 2017 02:09:18 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 0be4d1af0 -> 4424534e9


MINOR: Logging/debugging improvements for transactions

Author: Jason Gustafson <jason@confluent.io>
Author: Apurva Mehta <apurva.1618@gmail.com>

Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3185 from hachikuji/minor-transaction-logging-improvements

(cherry picked from commit 0c3e466eb035859659ce41404f3b71b577467dca)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4424534e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4424534e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4424534e

Branch: refs/heads/0.11.0
Commit: 4424534e99a92f54a873771dc3f05f23f3487669
Parents: 0be4d1a
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Jun 1 19:08:32 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Jun 1 19:08:52 2017 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 11 ++-
 .../producer/internals/ProducerIdAndEpoch.java  |  5 ++
 .../producer/internals/RecordAccumulator.java   |  6 +-
 .../clients/producer/internals/Sender.java      | 35 ++++-----
 .../producer/internals/TransactionManager.java  | 74 +++++++++-----------
 .../requests/AddOffsetsToTxnResponse.java       |  9 +++
 .../requests/AddPartitionsToTxnResponse.java    |  9 +++
 .../kafka/common/requests/EndTxnResponse.java   |  8 +++
 .../kafka/common/requests/FetchResponse.java    |  7 +-
 .../requests/FindCoordinatorResponse.java       |  4 +-
 .../common/requests/InitProducerIdResponse.java |  9 +++
 .../requests/TxnOffsetCommitResponse.java       |  8 +++
 .../transaction/TransactionCoordinator.scala    | 13 ++--
 core/src/main/scala/kafka/log/Log.scala         |  7 +-
 .../scala/kafka/tools/DumpLogSegments.scala     | 36 ++++++++--
 15 files changed, 157 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index c2beff8..defbbb7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -223,6 +223,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
                                 TopicPartition partition = entry.getKey();
                                 long fetchOffset = request.fetchData().get(partition).fetchOffset;
                                 FetchResponse.PartitionData fetchData = entry.getValue();
+
+                                log.debug("Fetch at offset {} for partition {} returned fetch
data {}", fetchOffset,
+                                        partition, fetchData);
+
                                 completedFetches.add(new CompletedFetch(partition, fetchOffset,
fetchData, metricAggregator,
                                         resp.requestHeader().apiVersion()));
                             }
@@ -232,7 +236,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
 
                         @Override
                         public void onFailure(RuntimeException e) {
-                            log.debug("Fetch request to {} for partitions {} failed", fetchTarget,
request.fetchData().keySet(), e);
+                            log.debug("Fetch request {} to {} failed", request.fetchData(),
fetchTarget, e);
                         }
                     });
         }
@@ -792,8 +796,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
         Map<Node, FetchRequest.Builder> requests = new HashMap<>();
         for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>>
entry : fetchable.entrySet()) {
             Node node = entry.getKey();
-            FetchRequest.Builder fetch = FetchRequest.Builder.forConsumer(this.maxWaitMs,
this.minBytes, entry.getValue(), isolationLevel).
-                    setMaxBytes(this.maxBytes);
+            FetchRequest.Builder fetch = FetchRequest.Builder.forConsumer(this.maxWaitMs,
this.minBytes,
+                    entry.getValue(), isolationLevel)
+                    .setMaxBytes(this.maxBytes);
             requests.put(node, fetch);
         }
         return requests;

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
index 01d5e86..293bb51 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
@@ -33,4 +33,9 @@ class ProducerIdAndEpoch {
     public boolean isValid() {
         return NO_PRODUCER_ID < producerId;
     }
+
+    @Override
+    public String toString() {
+        return "(producerId=" + producerId + ", epoch='" + epoch + ")";
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 3f9f4b1..2c4917d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -488,9 +488,9 @@ public final class RecordAccumulator {
                                             // the producer id and sequence here, this attempt
will also be accepted,
                                             // causing a duplicate.
                                             int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition);
-                                            log.debug("Dest: {} : producerId: {}, epoch:
{}, Assigning sequence for {}: {}",
-                                                    node, producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch,
-                                                    batch.topicPartition, sequenceNumber);
+                                            log.debug("Assigning sequence number {} from
producer {} to dequeued " +
+                                                            "batch from partition {} bound
for {}.",
+                                                    sequenceNumber, producerIdAndEpoch, batch.topicPartition,
node);
                                             batch.setProducerState(producerIdAndEpoch, sequenceNumber,
isTransactional);
                                         }
                                         batch.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 4f1c7d4..8b3957f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -217,7 +217,6 @@ public class Sender implements Runnable {
         }
 
         long pollTimeout = sendProducerData(now);
-        log.trace("waiting {}ms in poll", pollTimeout);
         client.poll(pollTimeout, now);
     }
 
@@ -298,7 +297,6 @@ public class Sender implements Runnable {
     }
 
     private boolean maybeSendTransactionalRequest(long now) {
-        String transactionalId = transactionManager.transactionalId();
         if (transactionManager.isCompletingTransaction() &&
                 !transactionManager.hasPartitionsToAdd() &&
                 accumulator.hasUnflushedBatches()) {
@@ -315,22 +313,15 @@ public class Sender implements Runnable {
                 accumulator.beginFlush();
 
             // Do not send the EndTxn until all pending batches have been completed
-            if (accumulator.hasUnflushedBatches()) {
-                log.trace("TransactionalId: {} -- Waiting for pending batches to be flushed
before completing transaction",
-                        transactionalId);
+            if (accumulator.hasUnflushedBatches())
                 return false;
-            }
         }
 
         TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
-        if (nextRequestHandler == null) {
-            log.trace("TransactionalId: {} -- There are no pending transactional requests
to send", transactionalId);
+        if (nextRequestHandler == null)
             return false;
-        }
 
         AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
-        log.trace("TransactionalId: {} -- Preparing to send request {}", transactionalId,
requestBuilder);
-
         while (true) {
             Node targetNode = null;
             try {
@@ -340,6 +331,7 @@ public class Sender implements Runnable {
                         transactionManager.lookupCoordinator(nextRequestHandler);
                         break;
                     }
+
                     if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeout))
{
                         transactionManager.lookupCoordinator(nextRequestHandler);
                         break;
@@ -347,33 +339,30 @@ public class Sender implements Runnable {
                 } else {
                     targetNode = awaitLeastLoadedNodeReady(requestTimeout);
                 }
+
                 if (targetNode != null) {
-                    if (nextRequestHandler.isRetry()) {
-                        log.trace("TransactionalId: {} -- Waiting {}ms before resending request
{}",
-                                transactionalId,
-                                retryBackoffMs, requestBuilder);
+                    if (nextRequestHandler.isRetry())
                         time.sleep(retryBackoffMs);
-                    }
+
                     ClientRequest clientRequest = client.newClientRequest(targetNode.idString(),
                             requestBuilder, now, true, nextRequestHandler);
                     transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
-                    log.debug("TransactionalId: {} -- Sending transactional request {} to
node {}",
-                            transactionalId, requestBuilder, clientRequest.destination());
+                    log.debug("{}Sending transactional request {} to node {}",
+                            transactionManager.logPrefix, requestBuilder, targetNode);
+
                     client.send(clientRequest, now);
                     return true;
                 }
             } catch (IOException e) {
-                log.debug("TransactionalId: {} -- Disconnect from {} while trying to send
request {}. Going " +
-                                "to back off and retry", transactionalId, targetNode, requestBuilder);
+                log.debug("{}Disconnect from {} while trying to send request {}. Going "
+
+                        "to back off and retry", transactionManager.logPrefix, targetNode,
requestBuilder);
             }
-            log.trace("TransactionalId: {} -- About to wait for {}ms before trying to send
another request.",
-                    transactionalId, retryBackoffMs);
+
             time.sleep(retryBackoffMs);
             metadata.requestUpdate();
         }
 
         transactionManager.retry(nextRequestHandler);
-
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 11068a7..9d9deac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -64,6 +64,7 @@ public class TransactionManager {
 
     private final String transactionalId;
     private final int transactionTimeoutMs;
+    public final String logPrefix;
 
     private final Map<TopicPartition, Integer> sequenceNumbers;
     private final PriorityQueue<TxnRequestHandler> pendingRequests;
@@ -136,6 +137,7 @@ public class TransactionManager {
         this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
         this.sequenceNumbers = new HashMap<>();
         this.transactionalId = transactionalId;
+        this.logPrefix = transactionalId == null ? "" : "[TransactionalId " + transactionalId
+ "] ";
         this.transactionTimeoutMs = transactionTimeoutMs;
         this.transactionCoordinator = null;
         this.consumerGroupCoordinator = null;
@@ -162,7 +164,7 @@ public class TransactionManager {
         this.sequenceNumbers.clear();
         InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId,
transactionTimeoutMs);
         InitProducerIdHandler handler = new InitProducerIdHandler(builder);
-        pendingRequests.add(handler);
+        enqueueRequest(handler);
         return handler.result;
     }
 
@@ -191,15 +193,14 @@ public class TransactionManager {
     }
 
     private TransactionalRequestResult beginCompletingTransaction(boolean isCommit) {
-        if (!newPartitionsInTransaction.isEmpty()) {
-            pendingRequests.add(addPartitionsToTransactionHandler());
-        }
+        if (!newPartitionsInTransaction.isEmpty())
+            enqueueRequest(addPartitionsToTransactionHandler());
 
         TransactionResult transactionResult = isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT;
         EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
                 producerIdAndEpoch.epoch, transactionResult);
         EndTxnHandler handler = new EndTxnHandler(builder);
-        pendingRequests.add(handler);
+        enqueueRequest(handler);
         return handler.result;
     }
 
@@ -214,7 +215,7 @@ public class TransactionManager {
         AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
                 producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
         AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
-        pendingRequests.add(handler);
+        enqueueRequest(handler);
         return handler.result;
     }
 
@@ -304,6 +305,8 @@ public class TransactionManager {
      * Set the producer id and epoch atomically.
      */
     void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) {
+        log.info("{}ProducerId set to {} with epoch {}", logPrefix, producerIdAndEpoch.producerId,
+                producerIdAndEpoch.epoch);
         this.producerIdAndEpoch = producerIdAndEpoch;
     }
 
@@ -355,35 +358,35 @@ public class TransactionManager {
 
     synchronized TxnRequestHandler nextRequestHandler() {
         if (!newPartitionsInTransaction.isEmpty())
-            pendingRequests.add(addPartitionsToTransactionHandler());
+            enqueueRequest(addPartitionsToTransactionHandler());
 
         TxnRequestHandler nextRequestHandler = pendingRequests.poll();
         if (nextRequestHandler != null && maybeTerminateRequestWithError(nextRequestHandler))
{
-            log.trace("TransactionalId: {} -- Not sending transactional request {} because
we are in an error state",
-                    transactionalId, nextRequestHandler.requestBuilder());
+            log.trace("{}Not sending transactional request {} because we are in an error
state",
+                    logPrefix, nextRequestHandler.requestBuilder());
             return null;
         }
 
         if (nextRequestHandler != null && nextRequestHandler.isEndTxn() &&
!transactionStarted) {
             nextRequestHandler.result.done();
             if (currentState != State.FATAL_ERROR) {
-                log.debug("TransactionId: {} -- Not sending EndTxn for completed transaction
since no partitions " +
-                        "or offsets were successfully added", transactionalId);
+                log.debug("{}Not sending EndTxn for completed transaction since no partitions
" +
+                        "or offsets were successfully added", logPrefix);
                 completeTransaction();
             }
-            return pendingRequests.poll();
+            nextRequestHandler = pendingRequests.poll();
         }
 
+
+        if (nextRequestHandler != null)
+            log.trace("{}Request {} dequeued for sending", logPrefix, nextRequestHandler.requestBuilder());
+
         return nextRequestHandler;
     }
 
     synchronized void retry(TxnRequestHandler request) {
         request.setRetry();
-        pendingRequests.add(request);
-    }
-
-    synchronized void reenqueue(TxnRequestHandler request) {
-        pendingRequests.add(request);
+        enqueueRequest(request);
     }
 
     Node coordinator(FindCoordinatorRequest.CoordinatorType type) {
@@ -445,7 +448,7 @@ public class TransactionManager {
             lastError = null;
         }
 
-        log.debug("TransactionalId {} -- Transition from state {} to {}", transactionalId,
currentState, target);
+        log.debug("{}Transition from state {} to {}", logPrefix, currentState, target);
         currentState = target;
     }
 
@@ -474,6 +477,11 @@ public class TransactionManager {
         return false;
     }
 
+    private void enqueueRequest(TxnRequestHandler requestHandler) {
+        log.debug("{}Enqueuing transactional request {}", logPrefix, requestHandler.requestBuilder());
+        pendingRequests.add(requestHandler);
+    }
+
     private synchronized void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type,
String coordinatorKey) {
         switch (type) {
             case GROUP:
@@ -487,7 +495,7 @@ public class TransactionManager {
         }
 
         FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type,
coordinatorKey);
-        pendingRequests.add(new FindCoordinatorHandler(builder));
+        enqueueRequest(new FindCoordinatorHandler(builder));
     }
 
     private synchronized void completeTransaction() {
@@ -550,7 +558,7 @@ public class TransactionManager {
         void reenqueue() {
             synchronized (TransactionManager.this) {
                 this.isRetry = true;
-                pendingRequests.add(this);
+                enqueueRequest(this);
             }
         }
 
@@ -562,12 +570,13 @@ public class TransactionManager {
             } else {
                 clearInFlightRequestCorrelationId();
                 if (response.wasDisconnected()) {
-                    log.trace("disconnected from " + response.destination() + ". Will retry.");
+                    log.trace("{}Disconnected from {}. Will retry.", logPrefix, response.destination());
                     reenqueue();
                 } else if (response.versionMismatch() != null) {
                     fatalError(response.versionMismatch());
                 } else if (response.hasResponse()) {
-                    log.trace("Got transactional response for request:" + requestBuilder());
+                    log.trace("{}Received transactional response {} for request {}", logPrefix,
+                            response.responseBody(), requestBuilder());
                     synchronized (TransactionManager.this) {
                         handleResponse(response.responseBody());
                     }
@@ -630,9 +639,6 @@ public class TransactionManager {
             InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response;
             Errors error = initProducerIdResponse.error();
 
-            log.debug("TransactionalId {} -- Received InitProducerId response with error
{}",
-                    transactionalId, error);
-
             if (error == Errors.NONE) {
                 ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(),
initProducerIdResponse.epoch());
                 setProducerIdAndEpoch(producerIdAndEpoch);
@@ -676,9 +682,6 @@ public class TransactionManager {
             boolean hasPartitionErrors = false;
             Set<String> unauthorizedTopics = new HashSet<>();
 
-            log.debug("TransactionalId {} -- Received AddPartitionsToTxn response with errors
{}",
-                    transactionalId, errors);
-
             for (Map.Entry<TopicPartition, Errors> topicPartitionErrorEntry : errors.entrySet())
{
                 TopicPartition topicPartition = topicPartitionErrorEntry.getKey();
                 Errors error = topicPartitionErrorEntry.getValue();
@@ -706,8 +709,7 @@ public class TransactionManager {
                 } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                     unauthorizedTopics.add(topicPartition.topic());
                 } else {
-                    log.error("TransactionalId: {} -- Could not add partition {} due to unexpected
error {}",
-                            transactionalId, topicPartition, error);
+                    log.error("{}Could not add partition {} due to unexpected error {}",
logPrefix, topicPartition, error);
                     hasPartitionErrors = true;
                 }
             }
@@ -758,9 +760,6 @@ public class TransactionManager {
             FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response;
             Errors error = findCoordinatorResponse.error();
 
-            log.debug("TransactionalId {} -- Received FindCoordinator response with error
{}",
-                    transactionalId, error);
-
             if (error == Errors.NONE) {
                 Node node = findCoordinatorResponse.node();
                 switch (builder.coordinatorType()) {
@@ -812,9 +811,6 @@ public class TransactionManager {
             EndTxnResponse endTxnResponse = (EndTxnResponse) response;
             Errors error = endTxnResponse.error();
 
-            log.debug("TransactionalId {} -- Received EndTxn response with error {}",
-                    transactionalId, error);
-
             if (error == Errors.NONE) {
                 completeTransaction();
                 result.done();
@@ -860,9 +856,6 @@ public class TransactionManager {
             AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response;
             Errors error = addOffsetsToTxnResponse.error();
 
-            log.debug("TransactionalId {} -- Received AddOffsetsToTxn response with error
{}",
-                    transactionalId, error);
-
             if (error == Errors.NONE) {
                 // note the result is not completed until the TxnOffsetCommit returns
                 pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
@@ -920,9 +913,6 @@ public class TransactionManager {
             boolean hadFailure = false;
             Map<TopicPartition, Errors> errors = txnOffsetCommitResponse.errors();
 
-            log.debug("TransactionalId {} -- Received TxnOffsetCommit response with errors
{}",
-                    transactionalId, errors);
-
             for (Map.Entry<TopicPartition, Errors> entry : errors.entrySet()) {
                 TopicPartition topicPartition = entry.getKey();
                 Errors error = entry.getValue();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 0536636..981a234 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -67,4 +67,13 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
     public static AddOffsetsToTxnResponse parse(ByteBuffer buffer, short version) {
         return new AddOffsetsToTxnResponse(ApiKeys.ADD_OFFSETS_TO_TXN.parseResponse(version,
buffer));
     }
+
+    @Override
+    public String toString() {
+        return "AddOffsetsToTxnResponse(" +
+                "error=" + error +
+                ", throttleTimeMs=" + throttleTimeMs +
+                ')';
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index 4112b93..f05310a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -105,4 +105,13 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
     public static AddPartitionsToTxnResponse parse(ByteBuffer buffer, short version) {
         return new AddPartitionsToTxnResponse(ApiKeys.ADD_PARTITIONS_TO_TXN.parseResponse(version,
buffer));
     }
+
+    @Override
+    public String toString() {
+        return "AddPartitionsToTxnResponse(" +
+                "errors=" + errors +
+                ", throttleTimeMs=" + throttleTimeMs +
+                ')';
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index 9083808..47a6623 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -66,4 +66,12 @@ public class EndTxnResponse extends AbstractResponse {
     public static EndTxnResponse parse(ByteBuffer buffer, short version) {
         return new EndTxnResponse(ApiKeys.END_TXN.parseResponse(version, buffer));
     }
+
+    @Override
+    public String toString() {
+        return "EndTxnResponse(" +
+                "error=" + error +
+                ", throttleTimeMs=" + throttleTimeMs +
+                ')';
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 96fee43..824a76f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -162,11 +162,14 @@ public class FetchResponse extends AbstractResponse {
 
         @Override
         public String toString() {
-            return "(error=" + error + ", highWaterMark=" + highWatermark +
+            return "(error=" + error +
+                    ", highWaterMark=" + highWatermark +
                     ", lastStableOffset = " + lastStableOffset +
                     ", logStartOffset = " + logStartOffset +
-                    ", abortedTransactions = " + abortedTransactions + ", records=" + records
+ ")";
+                    ", abortedTransactions = " + abortedTransactions +
+                    ", recordsSizeInBytes=" + records.sizeInBytes() + ")";
         }
+
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index 11eed1d..ae6986a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -108,11 +108,11 @@ public class FindCoordinatorResponse extends AbstractResponse {
 
     @Override
     public String toString() {
-        return "FindCoordinatorResponse{" +
+        return "FindCoordinatorResponse(" +
                 "throttleTimeMs=" + throttleTimeMs +
                 ", errorMessage='" + errorMessage + '\'' +
                 ", error=" + error +
                 ", node=" + node +
-                '}';
+                ')';
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index da5e6e5..88fb09c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -87,4 +87,13 @@ public class InitProducerIdResponse extends AbstractResponse {
         return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version,
buffer));
     }
 
+    @Override
+    public String toString() {
+        return "InitProducerIdResponse(" +
+                "error=" + error +
+                ", producerId=" + producerId +
+                ", producerEpoch=" + epoch +
+                ", throttleTimeMs=" + throttleTimeMs +
+                ')';
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 4c0f010..9a1cefa 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -109,4 +109,12 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
         return new TxnOffsetCommitResponse(ApiKeys.TXN_OFFSET_COMMIT.parseResponse(version,
buffer));
     }
 
+    @Override
+    public String toString() {
+        return "TxnOffsetCommitResponse(" +
+                "errors=" + errors +
+                ", throttleTimeMs=" + throttleTimeMs +
+                ')';
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 44e32b1..5c39635 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache, ReplicaManager}
 import kafka.utils.{Logging, Scheduler, ZkUtils}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
@@ -144,8 +145,8 @@ class TransactionCoordinator(brokerId: Int,
       }
 
       result match {
-        case Left(producerIdResult) =>
-          responseCallback(producerIdResult)
+        case Left(error) =>
+          responseCallback(error)
 
         case Right((coordinatorEpoch, newMetadata)) =>
           if (newMetadata.txnState == Ongoing) {
@@ -165,10 +166,14 @@ class TransactionCoordinator(brokerId: Int,
               sendRetriableErrorCallback)
           } else {
             def sendPidResponseCallback(error: Errors): Unit = {
-              if (error == Errors.NONE)
+              if (error == Errors.NONE) {
+                info(s"Initialized transactionalId $transactionalId with producerId ${newMetadata.producerId}
and producer " +
+                  s"epoch ${newMetadata.producerEpoch} on partition " +
+                  s"${Topic.TRANSACTION_STATE_TOPIC_NAME}-${txnManager.partitionFor(transactionalId)}")
                 responseCallback(initTransactionMetadata(newMetadata))
-              else
+              } else {
                 responseCallback(initTransactionError(error))
+              }
             }
 
             txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata,
sendPidResponseCallback)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index c37ea08..b9968a2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -674,7 +674,7 @@ class Log(@volatile var dir: File,
   }
 
   private def updateFirstUnstableOffset(): Unit = lock synchronized {
-    this.firstUnstableOffset = producerStateManager.firstUnstableOffset match {
+    val updatedFirstStableOffset = producerStateManager.firstUnstableOffset match {
       case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly =>
         val offset = logOffsetMetadata.messageOffset
         val segment = segments.floorEntry(offset).getValue
@@ -682,6 +682,11 @@ class Log(@volatile var dir: File,
         Some(LogOffsetMetadata(offset, segment.baseOffset, position.position))
       case other => other
     }
+
+    if (updatedFirstStableOffset != this.firstUnstableOffset) {
+      debug(s"First unstable offset for ${this.name} updated to $updatedFirstStableOffset")
+      this.firstUnstableOffset = updatedFirstStableOffset
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 4b38c27..3680d10 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
 
 import joptsimple.OptionParser
 import kafka.coordinator.group.{GroupMetadataKey, GroupMetadataManager, OffsetKey}
+import kafka.coordinator.transaction.TransactionLog
 import kafka.log._
 import kafka.serializer.Decoder
 import kafka.utils._
@@ -60,8 +61,10 @@ object DumpLogSegments {
                                .withOptionalArg()
                                .ofType(classOf[java.lang.String])
                                .defaultsTo("kafka.serializer.StringDecoder")
-    val offsetsOpt = parser.accepts("offsets-decoder", "if set, log data will be parsed as
offset data from __consumer_offsets topic.")
-
+    val offsetsOpt = parser.accepts("offsets-decoder", "if set, log data will be parsed as
offset data from the " +
+      "__consumer_offsets topic.")
+    val transactionLogOpt = parser.accepts("transaction-log-decoder", "if set, log data will
be parsed as " +
+      "transaction metadata from the __transaction_state topic.")
 
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to
the console, useful for debugging a seemingly corrupt log segment.")
@@ -70,7 +73,11 @@ object DumpLogSegments {
 
     CommandLineUtils.checkRequiredArgs(parser, options, filesOpt)
 
-    val printDataLog = options.has(printOpt) || options.has(offsetsOpt) || options.has(valueDecoderOpt)
|| options.has(keyDecoderOpt)
+    val printDataLog = options.has(printOpt) ||
+      options.has(offsetsOpt) ||
+      options.has(transactionLogOpt) ||
+      options.has(valueDecoderOpt) ||
+      options.has(keyDecoderOpt)
     val verifyOnly = options.has(verifyOpt)
     val indexSanityOnly = options.has(indexSanityOpt)
 
@@ -80,6 +87,8 @@ object DumpLogSegments {
 
     val messageParser = if (options.has(offsetsOpt)) {
       new OffsetsMessageParser
+    } else if (options.has(transactionLogOpt)) {
+      new TransactionLogMessageParser
     } else {
       val valueDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt),
new VerifiableProperties)
       val keyDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt),
new VerifiableProperties)
@@ -263,6 +272,24 @@ object DumpLogSegments {
     }
   }
 
+  private class TransactionLogMessageParser extends MessageParser[String, String] {
+
+    override def parse(record: Record): (Option[String], Option[String]) = {
+      val txnKey = TransactionLog.readTxnRecordKey(record.key)
+      val txnMetadata = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value)
+
+      val keyString = s"transactionalId=${txnKey.transactionalId}"
+      val valueString = s"producerId:${txnMetadata.producerId}," +
+        s"producerEpoch:${txnMetadata.producerEpoch}," +
+        s"state=${txnMetadata.state}," +
+        s"partitions=${txnMetadata.topicPartitions}," +
+        s"lastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}"
+
+      (Some(keyString), Some(valueString))
+    }
+
+  }
+
   private class OffsetsMessageParser extends MessageParser[String, String] {
     private def hex(bytes: Array[Byte]): String = {
       if (bytes.isEmpty)
@@ -356,7 +383,8 @@ object DumpLogSegments {
             " compresscodec: " + batch.compressionType)
 
           if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
-            print(" crc: " + batch.checksum + " sequence: " + record.sequence +
+            print(" producerId: " + batch.producerId + " sequence: " + record.sequence +
+              " isTransactional: " + batch.isTransactional +
               " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
           } else {
             print(" crc: " + record.checksumOrNull)


Mime
View raw message