kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/2] kafka git commit: MINOR: Rename InitPidRequest/InitPidResponse to InitProducerIdRequest/InitProducerIdResponse
Date Sat, 13 May 2017 03:02:23 GMT
MINOR: Rename InitPidRequest/InitPidResponse to InitProducerIdRequest/InitProducerIdResponse

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2997 from hachikuji/minor-rename-initpid


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1c8e7d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1c8e7d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1c8e7d9

Branch: refs/heads/trunk
Commit: a1c8e7d941ad9c765dac232435a297f905eeeed5
Parents: 1cb39f7
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri May 12 19:59:34 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Fri May 12 19:59:34 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   2 +-
 .../clients/producer/internals/PidAndEpoch.java |  36 ------
 .../producer/internals/ProducerBatch.java       |   4 +-
 .../producer/internals/ProducerIdAndEpoch.java  |  36 ++++++
 .../producer/internals/RecordAccumulator.java   |  12 +-
 .../clients/producer/internals/Sender.java      |  24 ++--
 .../producer/internals/TransactionManager.java  |  64 +++++-----
 .../errors/InvalidTxnTimeoutException.java      |   2 +-
 .../apache/kafka/common/protocol/Errors.java    |   3 +-
 .../kafka/common/requests/AbstractRequest.java  |   2 +-
 .../kafka/common/requests/AbstractResponse.java |   2 +-
 .../kafka/common/requests/InitPidRequest.java   | 104 ----------------
 .../kafka/common/requests/InitPidResponse.java  |  89 --------------
 .../common/requests/InitProducerIdRequest.java  | 104 ++++++++++++++++
 .../common/requests/InitProducerIdResponse.java |  89 ++++++++++++++
 .../clients/producer/internals/SenderTest.java  |  20 ++--
 .../internals/TransactionManagerTest.java       |  26 ++--
 .../common/requests/RequestResponseTest.java    |   8 +-
 .../transaction/ProducerIdManager.scala         | 119 ++++++++++---------
 .../transaction/TransactionCoordinator.scala    |  90 +++++++-------
 .../TransactionMarkerChannelManager.scala       |   7 +-
 .../scala/kafka/log/ProducerStateManager.scala  |  84 ++++++-------
 .../src/main/scala/kafka/server/KafkaApis.scala |  29 +++--
 .../main/scala/kafka/server/KafkaConfig.scala   |   2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   6 +-
 .../transaction/ProducerIdManagerTest.scala     |  14 +--
 .../TransactionCoordinatorIntegrationTest.scala |  16 +--
 .../TransactionCoordinatorTest.scala            |  50 ++++----
 .../unit/kafka/server/RequestQuotaTest.scala    |   4 +-
 29 files changed, 525 insertions(+), 523 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index b1f405a..05edf65 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -688,7 +688,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         if (transactionManager == null)
             return;
 
-        if (transactionManager.isTransactional() && !transactionManager.hasPid())
+        if (transactionManager.isTransactional() && !transactionManager.hasProducerId())
             throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.");
 
         if (transactionManager.isFenced())

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java
deleted file mode 100644
index 8647a7b..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.producer.internals;
-
-import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
-import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
-
-class PidAndEpoch {
-    static final PidAndEpoch NONE = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
-
-    public final long producerId;
-    public final short epoch;
-
-    PidAndEpoch(long producerId, short epoch) {
-        this.producerId = producerId;
-        this.epoch = epoch;
-    }
-
-    public boolean isValid() {
-        return NO_PRODUCER_ID < producerId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index f5fe8e6..3c5965a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -231,8 +231,8 @@ public final class ProducerBatch {
         return recordsBuilder.isFull();
     }
 
-    public void setProducerState(PidAndEpoch pidAndEpoch, int baseSequence) {
-        recordsBuilder.setProducerState(pidAndEpoch.producerId, pidAndEpoch.epoch, baseSequence);
+    public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence) {
+        recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
new file mode 100644
index 0000000..01d5e86
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+class ProducerIdAndEpoch {
+    static final ProducerIdAndEpoch NONE = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
+
+    public final long producerId;
+    public final short epoch;
+
+    ProducerIdAndEpoch(long producerId, short epoch) {
+        this.producerId = producerId;
+        this.epoch = epoch;
+    }
+
+    public boolean isValid() {
+        return NO_PRODUCER_ID < producerId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 4ffab0a..cf3736c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -444,16 +444,16 @@ public final class RecordAccumulator {
                                         // request
                                         break;
                                     } else {
-                                        PidAndEpoch pidAndEpoch = null;
+                                        ProducerIdAndEpoch producerIdAndEpoch = null;
                                         if (transactionManager != null) {
-                                            pidAndEpoch = transactionManager.pidAndEpoch();
-                                            if (!pidAndEpoch.isValid())
+                                            producerIdAndEpoch = transactionManager.pidAndEpoch();
+                                            if (!producerIdAndEpoch.isValid())
                                                 // we cannot send the batch until we have refreshed the PID
                                                 break;
                                         }
 
                                         ProducerBatch batch = deque.pollFirst();
-                                        if (pidAndEpoch != null && !batch.inRetry()) {
+                                        if (producerIdAndEpoch != null && !batch.inRetry()) {
                                             // If the batch is in retry, then we should not change the pid and
                                             // sequence number, since this may introduce duplicates. In particular,
                                             // the previous attempt may actually have been accepted, and if we change
@@ -461,9 +461,9 @@ public final class RecordAccumulator {
                                             // a duplicate.
                                             int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition);
                                             log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}",
-                                                    node, pidAndEpoch.producerId, pidAndEpoch.epoch,
+                                                    node, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
                                                     batch.topicPartition, sequenceNumber);
-                                            batch.setProducerState(pidAndEpoch, sequenceNumber);
+                                            batch.setProducerState(producerIdAndEpoch, sequenceNumber);
                                         }
                                         batch.close();
                                         size += batch.sizeInBytes();

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 4d95ac0..8b96b41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -42,8 +42,8 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.clients.NetworkClientUtils;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.requests.InitPidRequest;
-import org.apache.kafka.common.requests.InitPidResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.Time;
@@ -357,7 +357,7 @@ public class Sender implements Runnable {
 
     private ClientResponse sendAndAwaitInitPidRequest(Node node) throws IOException {
         String nodeId = node.idString();
-        InitPidRequest.Builder builder = new InitPidRequest.Builder(null);
+        InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null);
         ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, null);
         return NetworkClientUtils.sendAndReceive(client, request, time);
     }
@@ -376,28 +376,28 @@ public class Sender implements Runnable {
         if (transactionManager == null || transactionManager.isTransactional())
             return;
 
-        while (!transactionManager.hasPid()) {
+        while (!transactionManager.hasProducerId()) {
             try {
                 Node node = awaitLeastLoadedNodeReady(requestTimeout);
                 if (node != null) {
                     ClientResponse response = sendAndAwaitInitPidRequest(node);
-                    if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
-                        InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
-                        PidAndEpoch pidAndEpoch = new PidAndEpoch(
-                                initPidResponse.producerId(), initPidResponse.epoch());
-                        transactionManager.setPidAndEpoch(pidAndEpoch);
+                    if (response.hasResponse() && (response.responseBody() instanceof InitProducerIdResponse)) {
+                        InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
+                        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
+                                initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
+                        transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
                     } else {
-                        log.error("Received an unexpected response type for an InitPidRequest from {}. " +
+                        log.error("Received an unexpected response type for an InitProducerIdRequest from {}. " +
                                 "We will back off and try again.", node);
                     }
                 } else {
-                    log.debug("Could not find an available broker to send InitPidRequest to. " +
+                    log.debug("Could not find an available broker to send InitProducerIdRequest to. " +
                             "We will back off and try again.");
                 }
             } catch (Exception e) {
                 log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
             }
-            log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
+            log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs);
             time.sleep(retryBackoffMs);
             metadata.requestUpdate();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index ff3f114..566ad7c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -35,8 +35,8 @@ import org.apache.kafka.common.requests.EndTxnRequest;
 import org.apache.kafka.common.requests.EndTxnResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.InitPidRequest;
-import org.apache.kafka.common.requests.InitPidResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
@@ -79,7 +79,7 @@ public class TransactionManager {
 
     private volatile State currentState = State.UNINITIALIZED;
     private volatile Exception lastError = null;
-    private volatile PidAndEpoch pidAndEpoch;
+    private volatile ProducerIdAndEpoch producerIdAndEpoch;
 
     private enum State {
         UNINITIALIZED,
@@ -130,7 +130,7 @@ public class TransactionManager {
     }
 
     public TransactionManager(String transactionalId, int transactionTimeoutMs) {
-        this.pidAndEpoch = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
+        this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
         this.sequenceNumbers = new HashMap<>();
         this.transactionalId = transactionalId;
         this.transactionTimeoutMs = transactionTimeoutMs;
@@ -155,10 +155,10 @@ public class TransactionManager {
     public synchronized TransactionalRequestResult initializeTransactions() {
         ensureTransactional();
         transitionTo(State.INITIALIZING);
-        setPidAndEpoch(PidAndEpoch.NONE);
+        setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
         this.sequenceNumbers.clear();
-        InitPidRequest.Builder builder = new InitPidRequest.Builder(transactionalId, transactionTimeoutMs);
-        InitPidHandler handler = new InitPidHandler(builder);
+        InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
+        InitProducerIdHandler handler = new InitProducerIdHandler(builder);
         pendingRequests.add(handler);
         return handler.result;
     }
@@ -190,8 +190,8 @@ public class TransactionManager {
         }
 
         TransactionResult transactionResult = isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT;
-        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, pidAndEpoch.producerId,
-                pidAndEpoch.epoch, transactionResult);
+        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
+                producerIdAndEpoch.epoch, transactionResult);
         EndTxnHandler handler = new EndTxnHandler(builder);
         pendingRequests.add(handler);
         return handler.result;
@@ -206,7 +206,7 @@ public class TransactionManager {
                     "active transaction");
 
         AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
-                pidAndEpoch.producerId, pidAndEpoch.epoch, consumerGroupId);
+                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
         AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
         pendingRequests.add(handler);
         return handler.result;
@@ -226,8 +226,8 @@ public class TransactionManager {
         return transactionalId;
     }
 
-    public boolean hasPid() {
-        return pidAndEpoch.isValid();
+    public boolean hasProducerId() {
+        return producerIdAndEpoch.isValid();
     }
 
     public boolean isTransactional() {
@@ -262,20 +262,20 @@ public class TransactionManager {
     }
 
     /**
-     * Get the current pid and epoch without blocking. Callers must use {@link PidAndEpoch#isValid()} to
+     * Get the current pid and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to
      * verify that the result is valid.
      *
-     * @return the current PidAndEpoch.
+     * @return the current ProducerIdAndEpoch.
      */
-    PidAndEpoch pidAndEpoch() {
-        return pidAndEpoch;
+    ProducerIdAndEpoch pidAndEpoch() {
+        return producerIdAndEpoch;
     }
 
     /**
      * Set the pid and epoch atomically.
      */
-    void setPidAndEpoch(PidAndEpoch pidAndEpoch) {
-        this.pidAndEpoch = pidAndEpoch;
+    void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) {
+        this.producerIdAndEpoch = producerIdAndEpoch;
     }
 
     /**
@@ -299,7 +299,7 @@ public class TransactionManager {
         if (isTransactional())
             throw new IllegalStateException("Cannot reset producer state for a transactional producer. " +
                     "You must either abort the ongoing transaction or reinitialize the transactional producer instead");
-        setPidAndEpoch(PidAndEpoch.NONE);
+        setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
         this.sequenceNumbers.clear();
     }
 
@@ -448,7 +448,7 @@ public class TransactionManager {
         pendingPartitionsToBeAddedToTransaction.addAll(newPartitionsToBeAddedToTransaction);
         newPartitionsToBeAddedToTransaction.clear();
         AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId,
-                pidAndEpoch.producerId, pidAndEpoch.epoch, new ArrayList<>(pendingPartitionsToBeAddedToTransaction));
+                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, new ArrayList<>(pendingPartitionsToBeAddedToTransaction));
         return new AddPartitionsToTxnHandler(builder);
     }
 
@@ -461,7 +461,7 @@ public class TransactionManager {
             pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
         }
         TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(consumerGroupId,
-                pidAndEpoch.producerId, pidAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME,
+                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                 pendingTxnOffsetCommits);
         return new TxnOffsetCommitHandler(result, builder);
     }
@@ -487,7 +487,7 @@ public class TransactionManager {
         void fenced() {
             log.error("Producer has become invalid, which typically means another producer with the same " +
                             "transactional.id has been started: producerId: {}. epoch: {}.",
-                    pidAndEpoch.producerId, pidAndEpoch.epoch);
+                    producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
             result.setError(Errors.INVALID_PRODUCER_EPOCH.exception());
             transitionTo(State.FENCED, Errors.INVALID_PRODUCER_EPOCH.exception());
             result.done();
@@ -548,15 +548,15 @@ public class TransactionManager {
         abstract Priority priority();
     }
 
-    private class InitPidHandler extends TxnRequestHandler {
-        private final InitPidRequest.Builder builder;
+    private class InitProducerIdHandler extends TxnRequestHandler {
+        private final InitProducerIdRequest.Builder builder;
 
-        private InitPidHandler(InitPidRequest.Builder builder) {
+        private InitProducerIdHandler(InitProducerIdRequest.Builder builder) {
             this.builder = builder;
         }
 
         @Override
-        InitPidRequest.Builder requestBuilder() {
+        InitProducerIdRequest.Builder requestBuilder() {
             return builder;
         }
 
@@ -567,11 +567,11 @@ public class TransactionManager {
 
         @Override
         public void handleResponse(AbstractResponse response) {
-            InitPidResponse initPidResponse = (InitPidResponse) response;
-            Errors error = initPidResponse.error();
+            InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response;
+            Errors error = initProducerIdResponse.error();
             if (error == Errors.NONE) {
-                PidAndEpoch pidAndEpoch = new PidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
-                setPidAndEpoch(pidAndEpoch);
+                ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
+                setProducerIdAndEpoch(producerIdAndEpoch);
                 transitionTo(State.READY);
                 lastError = null;
                 result.done();
@@ -581,7 +581,7 @@ public class TransactionManager {
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
             } else {
-                fatal(new KafkaException("Unexpected error in InitPidResponse; " + error.message()));
+                fatal(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
             }
         }
     }
@@ -616,7 +616,7 @@ public class TransactionManager {
                 reenqueue();
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
-            } else if (error == Errors.INVALID_PID_MAPPING) {
+            } else if (error == Errors.INVALID_PRODUCER_ID_MAPPING) {
                 fatal(new KafkaException(error.exception()));
             } else if (error == Errors.INVALID_TXN_STATE) {
                 fatal(new KafkaException(error.exception()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
index e5df248..c751bc4 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.common.errors;
 
 /**
- * The transaction coordinator returns this error code if the timeout received via the InitPidRequest is larger than
+ * The transaction coordinator returns this error code if the timeout received via the InitProducerIdRequest is larger than
  * the `max.transaction.timeout.ms` config value.
  */
 public class InvalidTxnTimeoutException extends ApiException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 960fdda..58a0a2a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -439,7 +439,8 @@ public enum Errors {
                 return new InvalidTxnStateException(message);
             }
         }),
-    INVALID_PID_MAPPING(49, "The PID mapping is invalid",
+    INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producerId which is not currently assigned to " +
+            "its transactionalId",
         new ApiExceptionBuilder() {
             @Override
             public ApiException build(String message) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 04f2602..3aeb879 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -179,7 +179,7 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 request = new DeleteRecordsRequest(struct, version);
                 break;
             case INIT_PRODUCER_ID:
-                request = new InitPidRequest(struct, version);
+                request = new InitProducerIdRequest(struct, version);
                 break;
             case OFFSET_FOR_LEADER_EPOCH:
                 request = new OffsetsForLeaderEpochRequest(struct, version);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index b76cb21..617934c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -95,7 +95,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
             case DELETE_RECORDS:
                 return new DeleteRecordsResponse(struct);
             case INIT_PRODUCER_ID:
-                return new InitPidResponse(struct);
+                return new InitProducerIdResponse(struct);
             case OFFSET_FOR_LEADER_EPOCH:
                 return new OffsetsForLeaderEpochResponse(struct);
             case ADD_PARTITIONS_TO_TXN:

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
deleted file mode 100644
index 57d32e2..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class InitPidRequest extends AbstractRequest {
-    public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
-
-    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
-    private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
-
-    private final String transactionalId;
-    private final int transactionTimeoutMs;
-
-    public static class Builder extends AbstractRequest.Builder<InitPidRequest> {
-        private final String transactionalId;
-        private final int transactionTimeoutMs;
-
-        public Builder(String transactionalId) {
-            this(transactionalId, NO_TRANSACTION_TIMEOUT_MS);
-        }
-
-        public Builder(String transactionalId, int transactionTimeoutMs) {
-            super(ApiKeys.INIT_PRODUCER_ID);
-
-            if (transactionTimeoutMs <= 0)
-                throw new IllegalArgumentException("transaction timeout value is not positive: " + transactionTimeoutMs);
-
-            if (transactionalId != null && transactionalId.isEmpty())
-                throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
-
-            this.transactionalId = transactionalId;
-            this.transactionTimeoutMs = transactionTimeoutMs;
-        }
-
-        @Override
-        public InitPidRequest build(short version) {
-            return new InitPidRequest(version, transactionalId, transactionTimeoutMs);
-        }
-
-        @Override
-        public String toString() {
-            return "(type=InitPidRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
-                    transactionTimeoutMs + ")";
-        }
-    }
-
-    public InitPidRequest(Struct struct, short version) {
-        super(version);
-        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
-        this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
-    }
-
-    private InitPidRequest(short version, String transactionalId, int transactionTimeoutMs) {
-        super(version);
-        this.transactionalId = transactionalId;
-        this.transactionTimeoutMs = transactionTimeoutMs;
-    }
-
-    @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        return new InitPidResponse(throttleTimeMs, Errors.forException(e));
-    }
-
-    public static InitPidRequest parse(ByteBuffer buffer, short version) {
-        return new InitPidRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version);
-    }
-
-    public String transactionalId() {
-        return transactionalId;
-    }
-
-    public int transactionTimeoutMs() {
-        return transactionTimeoutMs;
-    }
-
-    @Override
-    protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
-        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
-        struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs);
-        return struct;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
deleted file mode 100644
index 3c858af..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.RecordBatch;
-
-import java.nio.ByteBuffer;
-
-public class InitPidResponse extends AbstractResponse {
-    /**
-     * Possible Error codes:
-     * OK
-     *
-     */
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
-    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
-    private static final String EPOCH_KEY_NAME = "producer_epoch";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-    private final int throttleTimeMs;
-    private final Errors error;
-    private final long producerId;
-    private final short epoch;
-
-    public InitPidResponse(int throttleTimeMs, Errors error, long producerId, short epoch) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.error = error;
-        this.producerId = producerId;
-        this.epoch = epoch;
-    }
-
-    public InitPidResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
-        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
-        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
-        this.epoch = struct.getShort(EPOCH_KEY_NAME);
-    }
-
-    public InitPidResponse(int throttleTimeMs, Errors errors) {
-        this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
-    }
-
-    public int throttleTimeMs() {
-        return throttleTimeMs;
-    }
-
-    public long producerId() {
-        return producerId;
-    }
-
-    public Errors error() {
-        return error;
-    }
-
-    public short epoch() {
-        return epoch;
-    }
-
-    @Override
-    protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        struct.set(PRODUCER_ID_KEY_NAME, producerId);
-        struct.set(EPOCH_KEY_NAME, epoch);
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-        return struct;
-    }
-
-    public static InitPidResponse parse(ByteBuffer buffer, short version) {
-        return new InitPidResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
new file mode 100644
index 0000000..45f88a2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class InitProducerIdRequest extends AbstractRequest {
+    public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
+
+    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+    private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
+
+    private final String transactionalId;
+    private final int transactionTimeoutMs;
+
+    public static class Builder extends AbstractRequest.Builder<InitProducerIdRequest> {
+        private final String transactionalId;
+        private final int transactionTimeoutMs;
+
+        public Builder(String transactionalId) {
+            this(transactionalId, NO_TRANSACTION_TIMEOUT_MS);
+        }
+
+        public Builder(String transactionalId, int transactionTimeoutMs) {
+            super(ApiKeys.INIT_PRODUCER_ID);
+
+            if (transactionTimeoutMs <= 0)
+                throw new IllegalArgumentException("transaction timeout value is not positive: " + transactionTimeoutMs);
+
+            if (transactionalId != null && transactionalId.isEmpty())
+                throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
+
+            this.transactionalId = transactionalId;
+            this.transactionTimeoutMs = transactionTimeoutMs;
+        }
+
+        @Override
+        public InitProducerIdRequest build(short version) {
+            return new InitProducerIdRequest(version, transactionalId, transactionTimeoutMs);
+        }
+
+        @Override
+        public String toString() {
+            return "(type=InitProducerIdRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
+                    transactionTimeoutMs + ")";
+        }
+    }
+
+    public InitProducerIdRequest(Struct struct, short version) {
+        super(version);
+        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+        this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
+    }
+
+    private InitProducerIdRequest(short version, String transactionalId, int transactionTimeoutMs) {
+        super(version);
+        this.transactionalId = transactionalId;
+        this.transactionTimeoutMs = transactionTimeoutMs;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new InitProducerIdResponse(throttleTimeMs, Errors.forException(e));
+    }
+
+    public static InitProducerIdRequest parse(ByteBuffer buffer, short version) {
+        return new InitProducerIdRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version);
+    }
+
+    public String transactionalId() {
+        return transactionalId;
+    }
+
+    public int transactionTimeoutMs() {
+        return transactionTimeoutMs;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
+        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+        struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs);
+        return struct;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
new file mode 100644
index 0000000..7c8a6e5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
+
+import java.nio.ByteBuffer;
+
+public class InitProducerIdResponse extends AbstractResponse {
+    /**
+     * Possible Error codes:
+     * OK
+     *
+     */
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
+    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
+    private static final String EPOCH_KEY_NAME = "producer_epoch";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private final int throttleTimeMs;
+    private final Errors error;
+    private final long producerId;
+    private final short epoch;
+
+    public InitProducerIdResponse(int throttleTimeMs, Errors error, long producerId, short epoch) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.error = error;
+        this.producerId = producerId;
+        this.epoch = epoch;
+    }
+
+    public InitProducerIdResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
+        this.epoch = struct.getShort(EPOCH_KEY_NAME);
+    }
+
+    public InitProducerIdResponse(int throttleTimeMs, Errors errors) {
+        this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public long producerId() {
+        return producerId;
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    public short epoch() {
+        return epoch;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.set(PRODUCER_ID_KEY_NAME, producerId);
+        struct.set(EPOCH_KEY_NAME, epoch);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        return struct;
+    }
+
+    public static InitProducerIdResponse parse(ByteBuffer buffer, short version) {
+        return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index c01a375..bb13dcb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -37,9 +37,9 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
-import org.apache.kafka.common.requests.InitPidRequest;
-import org.apache.kafka.common.requests.InitPidResponse;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
@@ -382,11 +382,11 @@ public class SenderTest {
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
-                return body instanceof InitPidRequest;
+                return body instanceof InitProducerIdRequest;
             }
-        }, new InitPidResponse(0, Errors.NONE, producerId, (short) 0));
+        }, new InitProducerIdResponse(0, Errors.NONE, producerId, (short) 0));
         sender.run(time.milliseconds());
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
         assertEquals(producerId, transactionManager.pidAndEpoch().producerId);
         assertEquals((short) 0, transactionManager.pidAndEpoch().epoch);
     }
@@ -395,7 +395,7 @@ public class SenderTest {
     public void testSequenceNumberIncrement() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
-        transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0));
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
         client.setNode(new Node(1, "localhost", 33343));
 
@@ -448,7 +448,7 @@ public class SenderTest {
     public void testAbortRetryWhenPidChanges() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
-        transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0));
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
         client.setNode(new Node(1, "localhost", 33343));
 
@@ -480,7 +480,7 @@ public class SenderTest {
         assertEquals(0, client.inFlightRequestCount());
         assertFalse("Client ready status should be false", client.isReady(node, 0L));
 
-        transactionManager.setPidAndEpoch(new PidAndEpoch(producerId + 1, (short) 0));
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId + 1, (short) 0));
         sender.run(time.milliseconds()); // receive error
         sender.run(time.milliseconds()); // reconnect
         sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
@@ -497,7 +497,7 @@ public class SenderTest {
     public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
-        transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0));
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
         client.setNode(new Node(1, "localhost", 33343));
 
@@ -528,7 +528,7 @@ public class SenderTest {
 
         sender.run(time.milliseconds());
         assertTrue(responseFuture.isDone());
-        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasPid());
+        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
     }
 
     private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 8e46eb7..53686e2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -44,8 +44,8 @@ import org.apache.kafka.common.requests.EndTxnRequest;
 import org.apache.kafka.common.requests.EndTxnResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.InitPidRequest;
-import org.apache.kafka.common.requests.InitPidResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.TransactionResult;
@@ -163,7 +163,7 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // get pid.
 
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
@@ -275,7 +275,7 @@ public class TransactionManagerTest {
 
         assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
         assertFalse(initPidResult.isCompleted());
-        assertFalse(transactionManager.hasPid());
+        assertFalse(transactionManager.hasProducerId());
 
         prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
         sender.run(time.milliseconds());
@@ -285,7 +285,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());  // get pid and epoch
 
         assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed.
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
         assertEquals(pid, transactionManager.pidAndEpoch().producerId);
         assertEquals(epoch, transactionManager.pidAndEpoch().epoch);
     }
@@ -308,7 +308,7 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // get pid.
 
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
 
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
@@ -365,7 +365,7 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // get pid.
 
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         // User does one producer.sed
         transactionManager.maybeAddPartitionToTransaction(tp0);
@@ -428,7 +428,7 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // get pid.
 
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
@@ -463,7 +463,7 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // get pid.
 
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
@@ -530,12 +530,12 @@ public class TransactionManagerTest {
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
-                InitPidRequest initPidRequest = (InitPidRequest) body;
-                assertEquals(initPidRequest.transactionalId(), transactionalId);
-                assertEquals(initPidRequest.transactionTimeoutMs(), transactionTimeoutMs);
+                InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
+                assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
+                assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
                 return true;
             }
-        }, new InitPidResponse(0, error, pid, epoch), shouldDisconnect);
+        }, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
     }
 
     private void prepareProduceResponse(Errors error, final long pid, final short epoch) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b1e83bf..cbfb6a9 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -879,12 +879,12 @@ public class RequestResponseTest {
         return new DeleteTopicsResponse(errors);
     }
 
-    private InitPidRequest createInitPidRequest() {
-        return new InitPidRequest.Builder(null, 100).build();
+    private InitProducerIdRequest createInitPidRequest() {
+        return new InitProducerIdRequest.Builder(null, 100).build();
     }
 
-    private InitPidResponse createInitPidResponse() {
-        return new InitPidResponse(0, Errors.NONE, 3332, (short) 3);
+    private InitProducerIdResponse createInitPidResponse() {
+        return new InitProducerIdResponse(0, Errors.NONE, 3332, (short) 3);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index bb7f57b..916ffa9 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -20,49 +20,49 @@ import kafka.common.KafkaException
 import kafka.utils.{Json, Logging, ZkUtils}
 
 /**
- * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds (PIDs) in a unique way
- * such that the same PID will not be assigned twice across multiple transaction coordinators.
+ * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way
+ * such that the same producerId will not be assigned twice across multiple transaction coordinators.
  *
- * PIDs are managed via ZooKeeper, where the latest pid block is written on the corresponding ZK path by the manager who
- * claims the block, where the written block_start_pid and block_end_pid are both inclusive.
+ * ProducerIds are managed via ZooKeeper, where the latest producerId block is written on the corresponding ZK
+ * path by the manager who claims the block, where the written block_start and block_end are both inclusive.
  */
 object ProducerIdManager extends Logging {
   val CurrentVersion: Long = 1L
   val PidBlockSize: Long = 1000L
 
-  def generatePidBlockJson(pidBlock: ProducerIdBlock): String = {
+  def generateProducerIdBlockJson(producerIdBlock: ProducerIdBlock): String = {
     Json.encode(Map("version" -> CurrentVersion,
-      "broker" -> pidBlock.brokerId,
-      "block_start" -> pidBlock.blockStartPid.toString,
-      "block_end" -> pidBlock.blockEndPid.toString)
+      "broker" -> producerIdBlock.brokerId,
+      "block_start" -> producerIdBlock.blockStartId.toString,
+      "block_end" -> producerIdBlock.blockEndId.toString)
     )
   }
 
-  def parsePidBlockData(jsonData: String): ProducerIdBlock = {
+  def parseProducerIdBlockData(jsonData: String): ProducerIdBlock = {
     try {
       Json.parseFull(jsonData).flatMap { m =>
-        val pidBlockInfo = m.asInstanceOf[Map[String, Any]]
-        val brokerId = pidBlockInfo("broker").asInstanceOf[Int]
-        val blockStartPID = pidBlockInfo("block_start").asInstanceOf[String].toLong
-        val blockEndPID = pidBlockInfo("block_end").asInstanceOf[String].toLong
-        Some(ProducerIdBlock(brokerId, blockStartPID, blockEndPID))
-      }.getOrElse(throw new KafkaException(s"Failed to parse the pid block json $jsonData"))
+        val producerIdBlockInfo = m.asInstanceOf[Map[String, Any]]
+        val brokerId = producerIdBlockInfo("broker").asInstanceOf[Int]
+        val blockStart = producerIdBlockInfo("block_start").asInstanceOf[String].toLong
+        val blockEnd = producerIdBlockInfo("block_end").asInstanceOf[String].toLong
+        Some(ProducerIdBlock(brokerId, blockStart, blockEnd))
+      }.getOrElse(throw new KafkaException(s"Failed to parse the producerId block json $jsonData"))
     } catch {
       case e: java.lang.NumberFormatException =>
         // this should never happen: the written data has exceeded long type limit
-        fatal(s"Read jason data $jsonData contains pids that have exceeded long type limit")
+        fatal(s"Read jason data $jsonData contains producerIds that have exceeded long type limit")
         throw e
     }
   }
 }
 
-case class ProducerIdBlock(brokerId: Int, blockStartPid: Long, blockEndPid: Long) {
+case class ProducerIdBlock(brokerId: Int, blockStartId: Long, blockEndId: Long) {
   override def toString: String = {
-    val pidBlockInfo = new StringBuilder
-    pidBlockInfo.append("(brokerId:" + brokerId)
-    pidBlockInfo.append(",blockStartPID:" + blockStartPid)
-    pidBlockInfo.append(",blockEndPID:" + blockEndPid + ")")
-    pidBlockInfo.toString()
+    val producerIdBlockInfo = new StringBuilder
+    producerIdBlockInfo.append("(brokerId:" + brokerId)
+    producerIdBlockInfo.append(",blockStartProducerId:" + blockStartId)
+    producerIdBlockInfo.append(",blockEndProducerId:" + blockEndId + ")")
+    producerIdBlockInfo.toString()
   }
 }
 
@@ -70,84 +70,85 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging
 
   this.logIdent = "[ProducerId Manager " + brokerId + "]: "
 
-  private var currentPIDBlock: ProducerIdBlock = null
-  private var nextPID: Long = -1L
+  private var currentProducerIdBlock: ProducerIdBlock = null
+  private var nextProducerId: Long = -1L
 
-  // grab the first block of PIDs
+  // grab the first block of producerIds
   this synchronized {
-    getNewPidBlock()
-    nextPID = currentPIDBlock.blockStartPid
+    getNewProducerIdBlock()
+    nextProducerId = currentProducerIdBlock.blockStartId
   }
 
-  private def getNewPidBlock(): Unit = {
+  private def getNewProducerIdBlock(): Unit = {
     var zkWriteComplete = false
     while (!zkWriteComplete) {
-      // refresh current pid block from zookeeper again
-      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+      // refresh current producerId block from zookeeper again
+      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath)
 
-      // generate the new pid block
-      currentPIDBlock = dataOpt match {
+      // generate the new producerId block
+      currentProducerIdBlock = dataOpt match {
         case Some(data) =>
-          val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
-          debug(s"Read current pid block $currPIDBlock, Zk path version $zkVersion")
+          val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)
+          debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
 
-          if (currPIDBlock.blockEndPid > Long.MaxValue - ProducerIdManager.PidBlockSize) {
-            // we have exhausted all pids (wow!), treat it as a fatal error
-            fatal(s"Exhausted all pids as the next block's end pid is will has exceeded long type limit (current block end pid is ${currPIDBlock.blockEndPid})")
-            throw new KafkaException("Have exhausted all pids.")
+          if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+            // we have exhausted all producerIds (wow!), treat it as a fatal error
+            fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")
+            throw new KafkaException("Have exhausted all producerIds.")
           }
 
-          ProducerIdBlock(brokerId, currPIDBlock.blockEndPid + 1L, currPIDBlock.blockEndPid + ProducerIdManager.PidBlockSize)
+          ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
         case None =>
-          debug(s"There is no pid block yet (Zk path version $zkVersion), creating the first block")
+          debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
           ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
       }
 
-      val newPIDBlockData = ProducerIdManager.generatePidBlockJson(currentPIDBlock)
+      val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
 
-      // try to write the new pid block into zookeeper
-      val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.PidBlockPath, newPIDBlockData, zkVersion, Some(checkPidBlockZkData))
+      // try to write the new producerId block into zookeeper
+      val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.ProducerIdBlockPath,
+        newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
       zkWriteComplete = succeeded
 
       if (zkWriteComplete)
-        info(s"Acquired new pid block $currentPIDBlock by writing to Zk with path version $version")
+        info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version")
     }
   }
 
-  private def checkPidBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
+  private def checkProducerIdBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
     try {
-      val expectedPidBlock = ProducerIdManager.parsePidBlockData(expectedData)
-      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+      val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
+      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath)
       dataOpt match {
         case Some(data) =>
-          val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
-          (currPIDBlock.equals(expectedPidBlock), zkVersion)
+          val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)
+          (currProducerIdBLock == expectedPidBlock, zkVersion)
         case None =>
           (false, -1)
       }
     } catch {
       case e: Exception =>
-        warn(s"Error while checking for pid block Zk data on path $path: expected data $expectedData", e)
-        
+        warn(s"Error while checking for producerId block Zk data on path $path: expected data $expectedData", e)
+
         (false, -1)
     }
   }
 
-  def nextPid(): Long = {
+  def generateProducerId(): Long = {
     this synchronized {
-      // grab a new block of PIDs if this block has been exhausted
-      if (nextPID > currentPIDBlock.blockEndPid) {
-        getNewPidBlock()
-        nextPID = currentPIDBlock.blockStartPid + 1
+      // grab a new block of producerIds if this block has been exhausted
+      if (nextProducerId > currentProducerIdBlock.blockEndId) {
+        getNewProducerIdBlock()
+        nextProducerId = currentProducerIdBlock.blockStartId + 1
       } else {
-        nextPID += 1
+        nextProducerId += 1
       }
 
-      nextPID - 1
+      nextProducerId - 1
     }
   }
 
   def shutdown() {
-    info(s"Shutdown complete: last PID assigned $nextPID")
+    info(s"Shutdown complete: last producerId assigned $nextProducerId")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 7632f3f..233f7d7 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -47,20 +47,20 @@ object TransactionCoordinator {
       config.transactionTopicMinISR,
       config.transactionTransactionsExpiredTransactionCleanupIntervalMs)
 
-    val pidManager = new ProducerIdManager(config.brokerId, zkUtils)
+    val producerIdManager = new ProducerIdManager(config.brokerId, zkUtils)
     val txnStateManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
     val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId, reaperEnabled = false)
     val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager, txnMarkerPurgatory, time)
 
-    new TransactionCoordinator(config.brokerId, scheduler, pidManager, txnStateManager, txnMarkerChannelManager, txnMarkerPurgatory, time)
+    new TransactionCoordinator(config.brokerId, scheduler, producerIdManager, txnStateManager, txnMarkerChannelManager, txnMarkerPurgatory, time)
   }
 
-  private def initTransactionError(error: Errors): InitPidResult = {
-    InitPidResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error)
+  private def initTransactionError(error: Errors): InitProducerIdResult = {
+    InitProducerIdResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error)
   }
 
-  private def initTransactionMetadata(txnMetadata: TransactionMetadataTransition): InitPidResult = {
-    InitPidResult(txnMetadata.producerId, txnMetadata.producerEpoch, Errors.NONE)
+  private def initTransactionMetadata(txnMetadata: TransactionMetadataTransition): InitProducerIdResult = {
+    InitProducerIdResult(txnMetadata.producerId, txnMetadata.producerEpoch, Errors.NONE)
   }
 }
 
@@ -74,7 +74,7 @@ object TransactionCoordinator {
  */
 class TransactionCoordinator(brokerId: Int,
                              scheduler: Scheduler,
-                             pidManager: ProducerIdManager,
+                             producerIdManager: ProducerIdManager,
                              txnManager: TransactionStateManager,
                              txnMarkerChannelManager: TransactionMarkerChannelManager,
                              txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
@@ -83,22 +83,21 @@ class TransactionCoordinator(brokerId: Int,
 
   import TransactionCoordinator._
 
-  type InitPidCallback = InitPidResult => Unit
+  type InitProducerIdCallback = InitProducerIdResult => Unit
   type AddPartitionsCallback = Errors => Unit
   type EndTxnCallback = Errors => Unit
 
   /* Active flag of the coordinator */
   private val isActive = new AtomicBoolean(false)
 
-  def handleInitPid(transactionalId: String,
-                    transactionTimeoutMs: Int,
-                    responseCallback: InitPidCallback): Unit = {
-
+  def handleInitProducerId(transactionalId: String,
+                           transactionTimeoutMs: Int,
+                           responseCallback: InitProducerIdCallback): Unit = {
     if (transactionalId == null || transactionalId.isEmpty) {
       // if the transactional id is not specified, then always blindly accept the request
-      // and return a new pid from the pid manager
-      val pid = pidManager.nextPid()
-      responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE))
+      // and return a new producerId from the producerId manager
+      val producerId = producerIdManager.generateProducerId()
+      responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
     } else if (!txnManager.isCoordinatorFor(transactionalId)) {
       // check if it is the assigned coordinator for the transactional id
       responseCallback(initTransactionError(Errors.NOT_COORDINATOR))
@@ -108,12 +107,12 @@ class TransactionCoordinator(brokerId: Int,
       // check transactionTimeoutMs is not larger than the broker configured maximum allowed value
       responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
     } else {
-      // only try to get a new pid and update the cache if the transactional id is unknown
-      val result: Either[InitPidResult, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
+      // only try to get a new producerId and update the cache if the transactional id is unknown
+      val result: Either[InitProducerIdResult, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
         case None =>
-          val pid = pidManager.nextPid()
+          val producerId = producerIdManager.generateProducerId()
           val now = time.milliseconds()
-          val createdMetadata = new TransactionMetadata(producerId = pid,
+          val createdMetadata = new TransactionMetadata(producerId = producerId,
             producerEpoch = 0,
             txnTimeoutMs = transactionTimeoutMs,
             state = Empty,
@@ -129,7 +128,7 @@ class TransactionCoordinator(brokerId: Int,
           // in this case we will treat it as the metadata has existed already
           txnMetadata synchronized {
             if (!txnMetadata.eq(createdMetadata)) {
-              initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
+              initProducerIdWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
             } else {
               Right(coordinatorEpoch, txnMetadata.prepareNewPid(time.milliseconds()))
             }
@@ -140,13 +139,13 @@ class TransactionCoordinator(brokerId: Int,
           val txnMetadata = existingEpochAndMetadata.transactionMetadata
 
           txnMetadata synchronized {
-            initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
+            initProducerIdWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
           }
       }
 
       result match {
-        case Left(pidResult) =>
-          responseCallback(pidResult)
+        case Left(producerIdResult) =>
+          responseCallback(producerIdResult)
 
         case Right((coordinatorEpoch, newMetadata)) =>
           if (newMetadata.txnState == Ongoing) {
@@ -178,11 +177,10 @@ class TransactionCoordinator(brokerId: Int,
     }
   }
 
-  private def initPidWithExistingMetadata(transactionalId: String,
-                                          transactionTimeoutMs: Int,
-                                          coordinatorEpoch: Int,
-                                          txnMetadata: TransactionMetadata): Either[InitPidResult, (Int, TransactionMetadataTransition)] = {
-
+  private def initProducerIdWithExistingMetadata(transactionalId: String,
+                                                 transactionTimeoutMs: Int,
+                                                 coordinatorEpoch: Int,
+                                                 txnMetadata: TransactionMetadata): Either[InitProducerIdResult, (Int, TransactionMetadataTransition)] = {
     if (txnMetadata.pendingTransitionInProgress) {
       // return a retriable exception to let the client backoff and retry
       Left(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
@@ -216,8 +214,8 @@ class TransactionCoordinator(brokerId: Int,
 
 
   def handleAddPartitionsToTransaction(transactionalId: String,
-                                       pid: Long,
-                                       epoch: Short,
+                                       producerId: Long,
+                                       producerEpoch: Short,
                                        partitions: collection.Set[TopicPartition],
                                        responseCallback: AddPartitionsCallback): Unit = {
     val error = validateTransactionalId(transactionalId)
@@ -225,10 +223,10 @@ class TransactionCoordinator(brokerId: Int,
       responseCallback(error)
     } else {
       // try to update the transaction metadata and append the updated metadata to txn log;
-      // if there is no such metadata treat it as invalid pid mapping error.
+      // if there is no such metadata treat it as invalid producerId mapping error.
       val result: Either[Errors, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
         case None =>
-          Left(Errors.INVALID_PID_MAPPING)
+          Left(Errors.INVALID_PRODUCER_ID_MAPPING)
 
         case Some(epochAndMetadata) =>
           val coordinatorEpoch = epochAndMetadata.coordinatorEpoch
@@ -236,9 +234,9 @@ class TransactionCoordinator(brokerId: Int,
 
           // generate the new transaction metadata with added partitions
           txnMetadata synchronized {
-            if (txnMetadata.producerId != pid) {
-              Left(Errors.INVALID_PID_MAPPING)
-            } else if (txnMetadata.producerEpoch != epoch) {
+            if (txnMetadata.producerId != producerId) {
+              Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+            } else if (txnMetadata.producerEpoch != producerEpoch) {
               Left(Errors.INVALID_PRODUCER_EPOCH)
             } else if (txnMetadata.pendingTransitionInProgress) {
               // return a retriable exception to let the client backoff and retry
@@ -274,8 +272,8 @@ class TransactionCoordinator(brokerId: Int,
   }
 
   def handleEndTransaction(transactionalId: String,
-                           pid: Long,
-                           epoch: Short,
+                           producerId: Long,
+                           producerEpoch: Short,
                            txnMarkerResult: TransactionResult,
                            responseCallback: EndTxnCallback): Unit = {
     val error = validateTransactionalId(transactionalId)
@@ -284,16 +282,16 @@ class TransactionCoordinator(brokerId: Int,
     else {
       val preAppendResult: Either[Errors, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
         case None =>
-          Left(Errors.INVALID_PID_MAPPING)
+          Left(Errors.INVALID_PRODUCER_ID_MAPPING)
 
         case Some(epochAndTxnMetadata) =>
           val txnMetadata = epochAndTxnMetadata.transactionMetadata
           val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
 
           txnMetadata synchronized {
-            if (txnMetadata.producerId != pid)
-              Left(Errors.INVALID_PID_MAPPING)
-            else if (txnMetadata.producerEpoch != epoch)
+            if (txnMetadata.producerId != producerId)
+              Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+            else if (txnMetadata.producerEpoch != producerEpoch)
               Left(Errors.INVALID_PRODUCER_EPOCH)
             else if (txnMetadata.pendingTransitionInProgress)
               Left(Errors.CONCURRENT_TRANSACTIONS)
@@ -343,9 +341,9 @@ class TransactionCoordinator(brokerId: Int,
 
                     val txnMetadata = epochAndMetadata.transactionMetadata
                     txnMetadata synchronized {
-                      if (txnMetadata.producerId != pid)
-                        Left(Errors.INVALID_PID_MAPPING)
-                      else if (txnMetadata.producerEpoch != epoch)
+                      if (txnMetadata.producerId != producerId)
+                        Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+                      else if (txnMetadata.producerEpoch != producerEpoch)
                         Left(Errors.INVALID_PRODUCER_EPOCH)
                       else if (txnMetadata.pendingTransitionInProgress)
                         Left(Errors.CONCURRENT_TRANSACTIONS)
@@ -452,11 +450,11 @@ class TransactionCoordinator(brokerId: Int,
     isActive.set(false)
     scheduler.shutdown()
     txnMarkerPurgatory.shutdown()
-    pidManager.shutdown()
+    producerIdManager.shutdown()
     txnManager.shutdown()
     txnMarkerChannelManager.shutdown()
     info("Shutdown complete.")
   }
 }
 
-case class InitPidResult(pid: Long, epoch: Short, error: Errors)
+case class InitProducerIdResult(producerId: Long, producerEpoch: Short, error: Errors)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index b7a2e80..90c9c42 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -251,7 +251,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet)
   }
 
-  def addTxnMarkersToBrokerQueue(transactionalId: String, pid: Long, epoch: Short, result: TransactionResult, coordinatorEpoch: Int, topicPartitions: immutable.Set[TopicPartition]): Unit = {
+  def addTxnMarkersToBrokerQueue(transactionalId: String, producerId: Long, producerEpoch: Short,
+                                 result: TransactionResult, coordinatorEpoch: Int,
+                                 topicPartitions: immutable.Set[TopicPartition]): Unit = {
     val txnTopicPartition = txnStateManager.partitionFor(transactionalId)
     val partitionsByDestination: immutable.Map[Node, immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition =>
       var brokerNode: Option[Node] = None
@@ -269,7 +271,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     }
 
     for ((broker: Node, topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination) {
-      val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, new TxnMarkerEntry(pid, epoch, coordinatorEpoch, result, topicPartitions.toList.asJava))
+      val marker = new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, topicPartitions.toList.asJava)
+      val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker)
       addMarkersForBroker(broker, txnTopicPartition, txnIdAndMarker)
     }
 


Mime
View raw message