kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [5/5] kafka git commit: KAFKA-4763; Handle disk failure for JBOD (KIP-112)
Date Sat, 22 Jul 2017 19:36:06 GMT
KAFKA-4763; Handle disk failure for JBOD (KIP-112)

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Onur Karaman <okaraman@linkedin.com>

Closes #2929 from lindong28/KAFKA-4763


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fc93fb4b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fc93fb4b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fc93fb4b

Branch: refs/heads/trunk
Commit: fc93fb4b6116e809f2b69ddb2f7e0f12548fef51
Parents: 91b5fc7
Author: Dong Lin <lindong28@gmail.com>
Authored: Sat Jul 22 12:35:32 2017 -0700
Committer: Jiangjie Qin <becket.qin@gmail.com>
Committed: Sat Jul 22 12:35:32 2017 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |   5 +-
 .../org/apache/kafka/common/PartitionInfo.java  |  21 +-
 .../common/errors/KafkaStorageException.java    |  50 ++
 .../apache/kafka/common/protocol/Errors.java    |  21 +-
 .../apache/kafka/common/protocol/Protocol.java  | 138 +++++-
 .../apache/kafka/common/record/FileRecords.java |   7 +
 .../kafka/common/requests/FetchResponse.java    |  11 +-
 .../common/requests/LeaderAndIsrRequest.java    |  12 +-
 .../kafka/common/requests/MetadataRequest.java  |   1 +
 .../kafka/common/requests/MetadataResponse.java |  57 ++-
 .../kafka/common/requests/PartitionState.java   |   8 +-
 .../kafka/common/requests/ProduceRequest.java   |   2 +
 .../kafka/common/requests/ProduceResponse.java  |   9 +-
 .../common/requests/UpdateMetadataRequest.java  |  58 ++-
 .../internals/AbstractCoordinatorTest.java      |   4 +-
 .../internals/ConsumerCoordinatorTest.java      |   4 +-
 .../clients/consumer/internals/FetcherTest.java |   3 +-
 .../internals/TransactionManagerTest.java       |   2 +-
 .../apache/kafka/common/PartitionInfoTest.java  |  15 +-
 .../kafka/common/protocol/ErrorsTest.java       |   4 +-
 .../common/requests/RequestResponseTest.java    |  21 +-
 .../main/scala/kafka/admin/AdminClient.scala    |   2 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala |   2 +-
 core/src/main/scala/kafka/api/ApiVersion.scala  |  12 +-
 .../src/main/scala/kafka/api/LeaderAndIsr.scala |  15 +-
 .../main/scala/kafka/cluster/Partition.scala    | 119 +++--
 .../kafka/common/KafkaStorageException.scala    |   6 +-
 .../controller/ControllerChannelManager.scala   |  51 +-
 .../kafka/controller/ControllerState.scala      |  11 +-
 .../kafka/controller/KafkaController.scala      | 169 +++++--
 .../controller/PartitionLeaderSelector.scala    |  13 +-
 .../controller/PartitionStateMachine.scala      |   7 +-
 .../kafka/controller/ReplicaStateMachine.scala  |   5 +-
 .../kafka/controller/TopicDeletionManager.scala |   2 +-
 .../group/GroupMetadataManager.scala            |  10 +-
 .../transaction/TransactionCoordinator.scala    |   2 +-
 .../transaction/TransactionStateManager.scala   |   5 +-
 .../main/scala/kafka/log/AbstractIndex.scala    |   6 +
 core/src/main/scala/kafka/log/Log.scala         | 495 ++++++++++---------
 core/src/main/scala/kafka/log/LogCleaner.scala  | 107 ++--
 .../scala/kafka/log/LogCleanerManager.scala     |  59 ++-
 core/src/main/scala/kafka/log/LogManager.scala  | 491 +++++++++++-------
 core/src/main/scala/kafka/log/LogSegment.scala  |  47 +-
 .../scala/kafka/log/ProducerStateManager.scala  |  10 +-
 .../kafka/server/AbstractFetcherThread.scala    |  10 +-
 .../kafka/server/BrokerMetadataCheckpoint.scala |   3 +-
 .../kafka/server/DelayedDeleteRecords.scala     |  18 +-
 .../main/scala/kafka/server/DelayedFetch.scala  |   6 +-
 .../scala/kafka/server/DelayedProduce.scala     |   5 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  70 ++-
 .../main/scala/kafka/server/KafkaServer.scala   |  42 +-
 .../kafka/server/LogDirFailureChannel.scala     |  55 +++
 .../main/scala/kafka/server/MetadataCache.scala |  21 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  68 ++-
 .../scala/kafka/server/ReplicaManager.scala     | 296 ++++++++---
 .../server/checkpoints/CheckpointFile.scala     | 124 ++---
 .../checkpoints/LeaderEpochCheckpointFile.scala |   5 +-
 .../checkpoints/OffsetCheckpointFile.scala      |   5 +-
 .../main/scala/kafka/utils/LogDirUtils.scala    |  66 +++
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   7 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |   8 +-
 .../kafka/api/IntegrationTestHarness.scala      |   5 +-
 .../kafka/api/LogDirFailureTest.scala           | 126 +++++
 .../kafka/api/TransactionsTest.scala            |   4 +-
 .../ReplicaFetcherThreadFatalErrorTest.scala    |   2 +-
 .../group/GroupMetadataManagerTest.scala        |  10 +-
 ...tionMarkerRequestCompletionHandlerTest.scala |   2 +-
 .../TransactionStateManagerTest.scala           |   2 +-
 .../log/AbstractLogCleanerIntegrationTest.scala |   3 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |   2 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |   2 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   |  10 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |  22 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |   4 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  12 +-
 .../server/HighwatermarkPersistenceTest.scala   |  26 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |  14 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |   4 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   4 +-
 .../unit/kafka/server/MetadataCacheTest.scala   |  18 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala |   6 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala |   4 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  43 +-
 .../unit/kafka/server/RequestQuotaTest.scala    |   7 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   5 +-
 .../checkpoints/OffsetCheckpointFileTest.scala  |  14 +-
 .../epoch/OffsetsForLeaderEpochTest.scala       |  26 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  20 +-
 docs/upgrade.html                               |  57 ++-
 .../integration/utils/IntegrationTestUtils.java |   8 +-
 .../internals/InternalTopicManagerTest.java     |   2 +-
 tests/kafkatest/services/kafka/config.py        |   2 +-
 .../kafkatest/services/kafka/config_property.py |   4 +
 tests/kafkatest/services/kafka/kafka.py         |  59 ++-
 tests/kafkatest/services/verifiable_producer.py |   5 +-
 .../tests/core/log_dir_failure_test.py          | 177 +++++++
 tests/kafkatest/tests/core/transactions_test.py |   3 +-
 97 files changed, 2526 insertions(+), 1096 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index f5ebac5..87f04b7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -864,7 +864,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                     log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
                     subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
                 }
-            } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
+            } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
+                       error == Errors.KAFKA_STORAGE_ERROR) {
                 log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
                 this.metadata.requestUpdate();
             } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
@@ -884,7 +885,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                 log.warn("Not authorized to read from topic {}.", tp.topic());
                 throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
-            } else if (error == Errors.UNKNOWN) {
+            } else if (error == Errors.UNKNOWN_SERVER_ERROR) {
                 log.warn("Unknown error fetching data for topic-partition {}", tp);
             } else {
                 throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
index b351116..38e4f67 100644
--- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.common;
 
 /**
- * Information about a topic-partition.
+ * This is used to describe per-partition state in the MetadataResponse.
  */
 public class PartitionInfo {
 
@@ -26,13 +26,20 @@ public class PartitionInfo {
     private final Node leader;
     private final Node[] replicas;
     private final Node[] inSyncReplicas;
+    private final Node[] offlineReplicas;
 
+    // Used only by tests
     public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
+        this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]);
+    }
+
+    public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas, Node[] offlineReplicas) {
         this.topic = topic;
         this.partition = partition;
         this.leader = leader;
         this.replicas = replicas;
         this.inSyncReplicas = inSyncReplicas;
+        this.offlineReplicas = offlineReplicas;
     }
 
     /**
@@ -71,14 +78,22 @@ public class PartitionInfo {
         return inSyncReplicas;
     }
 
+    /**
+     * The subset of the replicas that are offline
+     */
+    public Node[] offlineReplicas() {
+        return offlineReplicas;
+    }
+
     @Override
     public String toString() {
-        return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s)",
+        return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s, offlineReplicas = %s)",
                              topic,
                              partition,
                              leader == null ? "none" : leader.idString(),
                              formatNodeIds(replicas),
-                             formatNodeIds(inSyncReplicas));
+                             formatNodeIds(inSyncReplicas),
+                             formatNodeIds(offlineReplicas));
     }
 
     /* Extract the node ids from each item in the array and format for display */

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java b/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java
new file mode 100644
index 0000000..00c7cee
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Miscellaneous disk-related IOException occurred when handling a request.
+ * Client should request metadata update and retry if the response shows KafkaStorageException
+ *
+ * Here are the guidelines on how to handle KafkaStorageException and IOException:
+ *
+ * 1) If the server has not finished loading logs, IOException does not need to be converted to KafkaStorageException
+ * 2) After the server has finished loading logs, IOException should be caught and trigger LogDirFailureChannel.maybeAddLogFailureEvent
+ *    Then the IOException should either be swallowed and logged, or be converted and re-thrown as KafkaStorageException
+ * 3) It is preferred for IOException to be caught in Log rather than in ReplicaManager or LogSegment.
+ *
+ */
+public class KafkaStorageException extends InvalidMetadataException {
+
+    private static final long serialVersionUID = 1L;
+
+    public KafkaStorageException() {
+        super();
+    }
+
+    public KafkaStorageException(String message) {
+        super(message);
+    }
+
+    public KafkaStorageException(Throwable cause) {
+        super(cause);
+    }
+
+    public KafkaStorageException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index ae8d161..19acfd6 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.errors.InvalidTimestampException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
+import org.apache.kafka.common.errors.KafkaStorageException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.NotControllerException;
@@ -84,10 +85,15 @@ import java.util.Map;
  * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
  * are thus part of the protocol. The names can be changed but the error code cannot.
  *
+ * Note that client library will convert an unknown error code to the non-retriable UnknownServerException if the client library
+ * version is old and does not recognize the newly-added error code. Therefore when a new server-side error is added,
+ * we may need extra logic to convert the new error code to another existing error code before sending the response back to
+ * the client if the request version suggests that the client may not recognize the new error code.
+ *
  * Do not add exceptions that occur only on the client or only on the server here.
  */
 public enum Errors {
-    UNKNOWN(-1, "The server experienced an unexpected error when processing the request",
+    UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request",
         new ApiExceptionBuilder() {
             @Override
             public ApiException build(String message) {
@@ -495,7 +501,14 @@ public enum Errors {
             public ApiException build(String message) {
                 return new OperationNotAttemptedException(message);
             }
-        });
+    }),
+    KAFKA_STORAGE_ERROR(56, "Disk error when trying to access log file on the disk.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new KafkaStorageException(message);
+            }
+    });
 
     private interface ApiExceptionBuilder {
         ApiException build(String message);
@@ -588,7 +601,7 @@ public enum Errors {
             return error;
         } else {
             log.warn("Unexpected error code: {}.", code);
-            return UNKNOWN;
+            return UNKNOWN_SERVER_ERROR;
         }
     }
 
@@ -604,7 +617,7 @@ public enum Errors {
                 return error;
             clazz = clazz.getSuperclass();
         }
-        return UNKNOWN;
+        return UNKNOWN_SERVER_ERROR;
     }
 
     private static String toHtml() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 383332b..329d99b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -78,6 +78,9 @@ public class Protocol {
                                                                           "topics that don't exist will be created by the broker. " +
                                                                           "Otherwise, no topics will be created by the broker."));
 
+    /* The v5 metadata request is the same as v4. An additional field for offline_replicas has been added to the v5 metadata response */
+    public static final Schema METADATA_REQUEST_V5 = METADATA_REQUEST_V4;
+
     public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
                                                    new Field("host", STRING, "The hostname of the broker."),
                                                    new Field("port", INT32,
@@ -121,12 +124,40 @@ public class Protocol {
 
     public static final Schema PARTITION_METADATA_V1 = PARTITION_METADATA_V0;
 
+    // PARTITION_METADATA_V2 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
+    public static final Schema PARTITION_METADATA_V2 = new Schema(new Field("partition_error_code",
+                                                                            INT16,
+                                                                            "The error code for the partition, if any."),
+                                                                  new Field("partition_id",
+                                                                            INT32,
+                                                                            "The id of the partition."),
+                                                                  new Field("leader",
+                                                                            INT32,
+                                                                            "The id of the broker acting as leader for this partition."),
+                                                                  new Field("replicas",
+                                                                            new ArrayOf(INT32),
+                                                                            "The set of all nodes that host this partition."),
+                                                                  new Field("isr",
+                                                                            new ArrayOf(INT32),
+                                                                            "The set of nodes that are in sync with the leader for this partition."),
+                                                                  new Field("offline_replicas",
+                                                                            new ArrayOf(INT32),
+                                                                            "The set of offline replicas of this partition."));
+
     public static final Schema TOPIC_METADATA_V1 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
                                                               new Field("topic", STRING, "The name of the topic"),
                                                               new Field("is_internal", BOOLEAN,
                                                                   "Indicates if the topic is considered a Kafka internal topic"),
                                                               new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V1),
-                                                                "Metadata for each partition of the topic."));
+                                                                  "Metadata for each partition of the topic."));
+
+    // TOPIC_METADATA_V2 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
+    public static final Schema TOPIC_METADATA_V2 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
+                                                              new Field("topic", STRING, "The name of the topic"),
+                                                              new Field("is_internal", BOOLEAN,
+                                                                  "Indicates if the topic is considered a Kafka internal topic"),
+                                                              new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V2),
+                                                                  "Metadata for each partition of the topic."));
 
     public static final Schema METADATA_RESPONSE_V1 = new Schema(new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
                                                                     "Host and port information for all brokers."),
@@ -154,8 +185,19 @@ public class Protocol {
 
     public static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3;
 
-    public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4};
-    public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4};
+    // METADATA_RESPONSE_V5 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
+    public static final Schema METADATA_RESPONSE_V5 = new Schema(
+        newThrottleTimeField(),
+        new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
+            "Host and port information for all brokers."),
+        new Field("cluster_id", NULLABLE_STRING,
+            "The cluster id that this broker belongs to."),
+        new Field("controller_id", INT32,
+            "The broker id of the controller broker."),
+        new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V2)));
+
+    public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4, METADATA_REQUEST_V5};
+    public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4, METADATA_RESPONSE_V5};
 
     /* Produce api */
 
@@ -205,6 +247,13 @@ public class Protocol {
             new Field("timeout", INT32, "The time to await a response in ms."),
             new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
 
+    /**
+     * The body of PRODUCE_REQUEST_V4 is the same as PRODUCE_REQUEST_V3.
+     * The version number is bumped up to indicate that the client supports KafkaStorageException.
+     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
+     */
+    public static final Schema PRODUCE_REQUEST_V4 = PRODUCE_REQUEST_V3;
+
     public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
                                                                           new ArrayOf(new Schema(new Field("topic", STRING),
                                                                                                  new Field("partition_responses",
@@ -236,10 +285,18 @@ public class Protocol {
                                                                                                                             "If LogAppendTime is used for the topic, the timestamp will be " +
                                                                                                                             "the broker local time when the messages are appended."))))))),
                                                                 newThrottleTimeField());
+
     public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2;
 
-    public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};
-    public static final Schema[] PRODUCE_RESPONSE = {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3};
+    /**
+     * The body of PRODUCE_RESPONSE_V4 is the same as PRODUCE_RESPONSE_V3.
+     * The version number is bumped up to indicate that the client supports KafkaStorageException.
+     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
+     */
+    public static final Schema PRODUCE_RESPONSE_V4 = PRODUCE_RESPONSE_V3;
+
+    public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3, PRODUCE_REQUEST_V4};
+    public static final Schema[] PRODUCE_RESPONSE = {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3, PRODUCE_RESPONSE_V4};
 
     /* Offset commit api */
     public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -666,6 +723,13 @@ public class Protocol {
                     new ArrayOf(FETCH_REQUEST_TOPIC_V5),
                     "Topics to fetch in the order provided."));
 
+    /**
+     * The body of FETCH_REQUEST_V6 is the same as FETCH_REQUEST_V5.
+     * The version number is bumped up to indicate that the client supports KafkaStorageException.
+     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
+     */
+    public static final Schema FETCH_REQUEST_V6 = FETCH_REQUEST_V5;
+
     public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition",
                                                                                          INT32,
                                                                                          "Topic partition id."),
@@ -692,7 +756,6 @@ public class Protocol {
     public static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
     public static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2;
 
-
     // The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the
     // last stable offset). It also exposes messages with magic v2 (along with older formats).
     private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema(
@@ -759,8 +822,15 @@ public class Protocol {
             newThrottleTimeField(),
             new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
 
-    public static final Schema[] FETCH_REQUEST = {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5};
-    public static final Schema[] FETCH_RESPONSE = {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5};
+    /**
+     * The body of FETCH_RESPONSE_V6 is the same as FETCH_RESPONSE_V5.
+     * The version number is bumped up to indicate that the client supports KafkaStorageException.
+     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
+     */
+    public static final Schema FETCH_RESPONSE_V6 = FETCH_RESPONSE_V5;
+
+    public static final Schema[] FETCH_REQUEST = {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5, FETCH_REQUEST_V6};
+    public static final Schema[] FETCH_RESPONSE = {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6};
 
     /* List groups api */
     public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
@@ -1039,6 +1109,19 @@ public class Protocol {
                        new Field("zk_version", INT32, "The ZK version."),
                        new Field("replicas", new ArrayOf(INT32), "The replica ids."));
 
+    // LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1 added a per-partition is_new field.
+    // This field specifies whether the replica should have existed on the broker or not.
+    public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1 =
+            new Schema(new Field("topic", STRING, "Topic name."),
+                       new Field("partition", INT32, "Topic partition id."),
+                       new Field("controller_epoch", INT32, "The controller epoch."),
+                       new Field("leader", INT32, "The broker id for the leader."),
+                       new Field("leader_epoch", INT32, "The leader epoch."),
+                       new Field("isr", new ArrayOf(INT32), "The in sync replica ids."),
+                       new Field("zk_version", INT32, "The ZK version."),
+                       new Field("replicas", new ArrayOf(INT32), "The replica ids."),
+                       new Field("is_new", BOOLEAN, "Whether the replica should have existed on the broker or not"));
+
     public static final Schema LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0 =
             new Schema(new Field("id", INT32, "The broker id."),
                        new Field("host", STRING, "The hostname of the broker."),
@@ -1050,6 +1133,13 @@ public class Protocol {
                                                                                 new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0)),
                                                                       new Field("live_leaders", new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0)));
 
+    // LEADER_AND_ISR_REQUEST_V1 added a per-partition is_new field. This field specifies whether the replica should have existed on the broker or not.
+    public static final Schema LEADER_AND_ISR_REQUEST_V1 = new Schema(new Field("controller_id", INT32, "The controller id."),
+                                                                      new Field("controller_epoch", INT32, "The controller epoch."),
+                                                                      new Field("partition_states",
+                                                                                new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1)),
+                                                                      new Field("live_leaders", new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0)));
+
     public static final Schema LEADER_AND_ISR_RESPONSE_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
                                                                                  new Field("partition", INT32, "Topic partition id."),
                                                                                  new Field("error_code", INT16, "Error code."));
@@ -1058,8 +1148,11 @@ public class Protocol {
                                                                        new Field("partitions",
                                                                                  new ArrayOf(LEADER_AND_ISR_RESPONSE_PARTITION_V0)));
 
-    public static final Schema[] LEADER_AND_ISR_REQUEST = {LEADER_AND_ISR_REQUEST_V0};
-    public static final Schema[] LEADER_AND_ISR_RESPONSE = {LEADER_AND_ISR_RESPONSE_V0};
+    // LeaderAndIsrResponse V1 may receive KAFKA_STORAGE_ERROR in the response
+    public static final Schema LEADER_AND_ISR_RESPONSE_V1 = LEADER_AND_ISR_RESPONSE_V0;
+
+    public static final Schema[] LEADER_AND_ISR_REQUEST = {LEADER_AND_ISR_REQUEST_V0, LEADER_AND_ISR_REQUEST_V1};
+    public static final Schema[] LEADER_AND_ISR_RESPONSE = {LEADER_AND_ISR_RESPONSE_V0, LEADER_AND_ISR_RESPONSE_V1};
 
     /* Replica api */
     public static final Schema STOP_REPLICA_REQUEST_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
@@ -1141,6 +1234,18 @@ public class Protocol {
 
     public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V3 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V2;
 
+    // UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
+    public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 =
+            new Schema(new Field("topic", STRING, "Topic name."),
+                       new Field("partition", INT32, "Topic partition id."),
+                       new Field("controller_epoch", INT32, "The controller epoch."),
+                       new Field("leader", INT32, "The broker id for the leader."),
+                       new Field("leader_epoch", INT32, "The leader epoch."),
+                       new Field("isr", new ArrayOf(INT32), "The in sync replica ids."),
+                       new Field("zk_version", INT32, "The ZK version."),
+                       new Field("replicas", new ArrayOf(INT32), "The replica ids."),
+                       new Field("offline_replicas", new ArrayOf(INT32), "The offline replica ids"));
+
     public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V3 =
             new Schema(new Field("port", INT32, "The port on which the broker accepts requests."),
                     new Field("host", STRING, "The hostname of the broker."),
@@ -1158,12 +1263,21 @@ public class Protocol {
                     new Field("partition_states", new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V3)),
                     new Field("live_brokers", new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V3)));
 
+    // UPDATE_METADATA_REQUEST_V4 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
+    public static final Schema UPDATE_METADATA_REQUEST_V4 =
+            new Schema(new Field("controller_id", INT32, "The controller id."),
+                    new Field("controller_epoch", INT32, "The controller epoch."),
+                    new Field("partition_states", new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V4)),
+                    new Field("live_brokers", new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V3)));
+
     public static final Schema UPDATE_METADATA_RESPONSE_V3 = UPDATE_METADATA_RESPONSE_V2;
 
+    public static final Schema UPDATE_METADATA_RESPONSE_V4 = UPDATE_METADATA_RESPONSE_V3;
+
     public static final Schema[] UPDATE_METADATA_REQUEST = {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1,
-        UPDATE_METADATA_REQUEST_V2, UPDATE_METADATA_REQUEST_V3};
+        UPDATE_METADATA_REQUEST_V2, UPDATE_METADATA_REQUEST_V3, UPDATE_METADATA_REQUEST_V4};
     public static final Schema[] UPDATE_METADATA_RESPONSE = {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1,
-        UPDATE_METADATA_RESPONSE_V2, UPDATE_METADATA_RESPONSE_V3};
+        UPDATE_METADATA_RESPONSE_V2, UPDATE_METADATA_RESPONSE_V3, UPDATE_METADATA_RESPONSE_V4};
 
     /* SASL handshake api */
     public static final Schema SASL_HANDSHAKE_REQUEST_V0 = new Schema(

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 35431d8..a898634 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -172,6 +172,13 @@ public class FileRecords extends AbstractRecords implements Closeable {
     }
 
     /**
+     * Close file handlers used by the FileChannel but don't write to disk. This is used when the disk may have failed
+     */
+    public void closeHandlers() throws IOException {
+        channel.close();
+    }
+
+    /**
      * Delete this message set from the filesystem
      * @return True iff this message set was deleted.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 824a76f..f0f516c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -203,7 +203,7 @@ public class FetchResponse extends AbstractResponse {
                 long logStartOffset = INVALID_LOG_START_OFFSET;
                 if (partitionResponseHeader.hasField(LOG_START_OFFSET_KEY_NAME))
                     logStartOffset = partitionResponseHeader.getLong(LOG_START_OFFSET_KEY_NAME);
-                
+
                 Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
 
                 List<AbortedTransaction> abortedTransactions = null;
@@ -326,10 +326,17 @@ public class FetchResponse extends AbstractResponse {
             List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
                 PartitionData fetchPartitionData = partitionEntry.getValue();
+                short errorCode = fetchPartitionData.error.code();
+                // If consumer sends FetchRequest V5 or earlier, the client library is not guaranteed to recognize the error code
+                // for KafkaStorageException. In this case the client library will translate KafkaStorageException to
+                // UnknownServerException which is not retriable. We can ensure that consumer will update metadata and retry
+                // by converting the KafkaStorageException to NotLeaderForPartitionException in the response if FetchRequest version <= 5
+                if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 5)
+                    errorCode = Errors.NOT_LEADER_FOR_PARTITION.code();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 Struct partitionDataHeader = partitionData.instance(PARTITION_HEADER_KEY_NAME);
                 partitionDataHeader.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionDataHeader.set(ERROR_CODE_KEY_NAME, fetchPartitionData.error.code());
+                partitionDataHeader.set(ERROR_CODE_KEY_NAME, errorCode);
                 partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
 
                 if (partitionDataHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 1fdb4a2..733c9af 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -45,6 +45,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     private static final String ISR_KEY_NAME = "isr";
     private static final String ZK_VERSION_KEY_NAME = "zk_version";
     private static final String REPLICAS_KEY_NAME = "replicas";
+    private static final String IS_NEW_KEY_NAME = "is_new";
 
     // live_leaders key names
     private static final String END_POINT_ID_KEY_NAME = "id";
@@ -57,9 +58,9 @@ public class LeaderAndIsrRequest extends AbstractRequest {
         private final Map<TopicPartition, PartitionState> partitionStates;
         private final Set<Node> liveLeaders;
 
-        public Builder(int controllerId, int controllerEpoch,
+        public Builder(short version, int controllerId, int controllerEpoch,
                        Map<TopicPartition, PartitionState> partitionStates, Set<Node> liveLeaders) {
-            super(ApiKeys.LEADER_AND_ISR);
+            super(ApiKeys.LEADER_AND_ISR, version);
             this.controllerId = controllerId;
             this.controllerEpoch = controllerEpoch;
             this.partitionStates = partitionStates;
@@ -121,10 +122,10 @@ public class LeaderAndIsrRequest extends AbstractRequest {
             List<Integer> replicas = new ArrayList<>(replicasArray.length);
             for (Object r : replicasArray)
                 replicas.add((Integer) r);
+            boolean isNew = partitionStateData.hasField(IS_NEW_KEY_NAME) ? partitionStateData.getBoolean(IS_NEW_KEY_NAME) : false;
 
-            PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
+            PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas, isNew);
             partitionStates.put(new TopicPartition(topic, partition), partitionState);
-
         }
 
         Set<Node> leaders = new HashSet<>();
@@ -162,6 +163,8 @@ public class LeaderAndIsrRequest extends AbstractRequest {
             partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
             partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion);
             partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray());
+            if (partitionStateData.hasField(IS_NEW_KEY_NAME))
+                partitionStateData.set(IS_NEW_KEY_NAME, partitionState.isNew);
             partitionStatesData.add(partitionStateData);
         }
         struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
@@ -188,6 +191,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
+            case 1:
                 return new LeaderAndIsrResponse(Errors.NONE, responses);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 0493f3d..8aa2fc3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -135,6 +135,7 @@ public class MetadataRequest extends AbstractRequest {
                 return new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
             case 3:
             case 4:
+            case 5:
                 return new MetadataResponse(throttleTimeMs, Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index b798764..10f5c13 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -80,6 +80,7 @@ public class MetadataResponse extends AbstractResponse {
     private static final String LEADER_KEY_NAME = "leader";
     private static final String REPLICAS_KEY_NAME = "replicas";
     private static final String ISR_KEY_NAME = "isr";
+    private static final String OFFLINE_REPLICAS_KEY_NAME = "offline_replicas";
 
     private final int throttleTimeMs;
     private final Collection<Node> brokers;
@@ -149,26 +150,18 @@ public class MetadataResponse extends AbstractResponse {
                 int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
                 int leader = partitionInfo.getInt(LEADER_KEY_NAME);
                 Node leaderNode = leader == -1 ? null : brokers.get(leader);
-                Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
 
-                List<Node> replicaNodes = new ArrayList<>(replicas.length);
-                for (Object replicaNodeId : replicas) {
-                    if (brokers.containsKey(replicaNodeId))
-                        replicaNodes.add(brokers.get(replicaNodeId));
-                    else
-                        replicaNodes.add(new Node((int) replicaNodeId, "", -1));
-                }
+                Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
+                List<Node> replicaNodes = convertToNodes(brokers, replicas);
 
                 Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
-                List<Node> isrNodes = new ArrayList<>(isr.length);
-                for (Object isrNode : isr) {
-                    if (brokers.containsKey(isrNode))
-                        isrNodes.add(brokers.get(isrNode));
-                    else
-                        isrNodes.add(new Node((int) isrNode, "", -1));
-                }
+                List<Node> isrNodes = convertToNodes(brokers, isr);
+
+                Object[] offlineReplicas = partitionInfo.hasField(OFFLINE_REPLICAS_KEY_NAME) ?
+                    (Object[]) partitionInfo.get(OFFLINE_REPLICAS_KEY_NAME) : new Object[0];
+                List<Node> offlineNodes = convertToNodes(brokers, offlineReplicas);
 
-                partitionMetadata.add(new PartitionMetadata(partitionError, partition, leaderNode, replicaNodes, isrNodes));
+                partitionMetadata.add(new PartitionMetadata(partitionError, partition, leaderNode, replicaNodes, isrNodes, offlineNodes));
             }
 
             topicMetadata.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadata));
@@ -179,6 +172,16 @@ public class MetadataResponse extends AbstractResponse {
         this.topicMetadata = topicMetadata;
     }
 
+    private List<Node> convertToNodes(Map<Integer, Node> brokers, Object[] brokerIds) {
+        List<Node> nodes = new ArrayList<>(brokerIds.length);
+        for (Object brokerId : brokerIds)
+            if (brokers.containsKey(brokerId))
+                nodes.add(brokers.get(brokerId));
+            else
+                nodes.add(new Node((int) brokerId, "", -1));
+        return nodes;
+    }
+
     private Node getControllerNode(int controllerId, Collection<Node> brokers) {
         for (Node broker : brokers) {
             if (broker.id() == controllerId)
@@ -256,7 +259,8 @@ public class MetadataResponse extends AbstractResponse {
                             partitionMetadata.partition,
                             partitionMetadata.leader,
                             partitionMetadata.replicas.toArray(new Node[0]),
-                            partitionMetadata.isr.toArray(new Node[0])));
+                            partitionMetadata.isr.toArray(new Node[0]),
+                            partitionMetadata.offlineReplicas.toArray(new Node[0])));
             }
         }
 
@@ -334,23 +338,27 @@ public class MetadataResponse extends AbstractResponse {
 
     }
 
+    // This is used to describe per-partition state in the MetadataResponse
     public static class PartitionMetadata {
         private final Errors error;
         private final int partition;
         private final Node leader;
         private final List<Node> replicas;
         private final List<Node> isr;
+        private final List<Node> offlineReplicas;
 
         public PartitionMetadata(Errors error,
                                  int partition,
                                  Node leader,
                                  List<Node> replicas,
-                                 List<Node> isr) {
+                                 List<Node> isr,
+                                 List<Node> offlineReplicas) {
             this.error = error;
             this.partition = partition;
             this.leader = leader;
             this.replicas = replicas;
             this.isr = isr;
+            this.offlineReplicas = offlineReplicas;
         }
 
         public Errors error() {
@@ -373,6 +381,10 @@ public class MetadataResponse extends AbstractResponse {
             return isr;
         }
 
+        public List<Node> offlineReplicas() {
+            return offlineReplicas;
+        }
+
         @Override
         public String toString() {
             return "(type=PartitionMetadata," +
@@ -380,7 +392,8 @@ public class MetadataResponse extends AbstractResponse {
                     ", partition=" + partition +
                     ", leader=" + leader +
                     ", replicas=" + Utils.join(replicas, ",") +
-                    ", isr=" + Utils.join(isr, ",") + ')';
+                    ", isr=" + Utils.join(isr, ",") +
+                    ", offlineReplicas=" + Utils.join(offlineReplicas, ",") + ')';
         }
     }
 
@@ -433,6 +446,12 @@ public class MetadataResponse extends AbstractResponse {
                 for (Node node : partitionMetadata.isr)
                     isr.add(node.id());
                 partitionData.set(ISR_KEY_NAME, isr.toArray());
+                if (partitionData.hasField(OFFLINE_REPLICAS_KEY_NAME)) {
+                    ArrayList<Integer> offlineReplicas = new ArrayList<>(partitionMetadata.offlineReplicas.size());
+                    for (Node node : partitionMetadata.offlineReplicas)
+                        offlineReplicas.add(node.id());
+                    partitionData.set(OFFLINE_REPLICAS_KEY_NAME, offlineReplicas.toArray());
+                }
                 partitionMetadataArray.add(partitionData);
 
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
index 394a60f..be303d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Utils;
 
 import java.util.List;
 
+// This is used to describe per-partition state in the LeaderAndIsrRequest
 public class PartitionState {
     public final int controllerEpoch;
     public final int leader;
@@ -27,14 +28,16 @@ public class PartitionState {
     public final List<Integer> isr;
     public final int zkVersion;
     public final List<Integer> replicas;
+    public final boolean isNew;
 
-    public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, List<Integer> replicas) {
+    public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, List<Integer> replicas, boolean isNew) {
         this.controllerEpoch = controllerEpoch;
         this.leader = leader;
         this.leaderEpoch = leaderEpoch;
         this.isr = isr;
         this.zkVersion = zkVersion;
         this.replicas = replicas;
+        this.isNew = isNew;
     }
 
     @Override
@@ -44,6 +47,7 @@ public class PartitionState {
                 ", leaderEpoch=" + leaderEpoch +
                 ", isr=" + Utils.join(isr, ",") +
                 ", zkVersion=" + zkVersion +
-                ", replicas=" + Utils.join(replicas, ",") + ")";
+                ", replicas=" + Utils.join(replicas, ",") +
+                ", isNew=" + isNew + ")";
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 3d696c1..089d199 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -246,6 +246,7 @@ public class ProduceRequest extends AbstractRequest {
             case 1:
             case 2:
             case 3:
+            case 4:
                 return new ProduceResponse(responseMap, throttleTimeMs);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
@@ -307,6 +308,7 @@ public class ProduceRequest extends AbstractRequest {
                 return RecordBatch.MAGIC_VALUE_V1;
 
             case 3:
+            case 4:
                 return RecordBatch.MAGIC_VALUE_V2;
 
             default:

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index d42f1c6..fdfba8b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -122,9 +122,16 @@ public class ProduceResponse extends AbstractResponse {
             List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
                 PartitionResponse part = partitionEntry.getValue();
+                short errorCode = part.error.code();
+                // If producer sends ProduceRequest V3 or earlier, the client library is not guaranteed to recognize the error code
+                // for KafkaStorageException. In this case the client library will translate KafkaStorageException to
+                // UnknownServerException which is not retriable. We can ensure that producer will update metadata and retry
+                // by converting the KafkaStorageException to NotLeaderForPartitionException in the response if ProduceRequest version <= 3
+                if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 3)
+                    errorCode = Errors.NOT_LEADER_FOR_PARTITION.code();
                 Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
                         .set(PARTITION_KEY_NAME, partitionEntry.getKey())
-                        .set(ERROR_CODE_KEY_NAME, part.error.code())
+                        .set(ERROR_CODE_KEY_NAME, errorCode)
                         .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
                 if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME))
                     partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 8f9b592..1e20866 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -74,6 +75,43 @@ public class UpdateMetadataRequest extends AbstractRequest {
         }
     }
 
+    public static final class PartitionState {
+        public final int controllerEpoch;
+        public final int leader;
+        public final int leaderEpoch;
+        public final List<Integer> isr;
+        public final int zkVersion;
+        public final List<Integer> replicas;
+        public final List<Integer> offlineReplicas;
+
+        public PartitionState(int controllerEpoch,
+                              int leader,
+                              int leaderEpoch,
+                              List<Integer> isr,
+                              int zkVersion,
+                              List<Integer> replicas,
+                              List<Integer> offlineReplicas) {
+            this.controllerEpoch = controllerEpoch;
+            this.leader = leader;
+            this.leaderEpoch = leaderEpoch;
+            this.isr = isr;
+            this.zkVersion = zkVersion;
+            this.replicas = replicas;
+            this.offlineReplicas = offlineReplicas;
+        }
+
+        @Override
+        public String toString() {
+            return "PartitionState(controllerEpoch=" + controllerEpoch +
+                ", leader=" + leader +
+                ", leaderEpoch=" + leaderEpoch +
+                ", isr=" + Arrays.toString(isr.toArray()) +
+                ", zkVersion=" + zkVersion +
+                ", replicas=" + Arrays.toString(replicas.toArray()) +
+                ", offlineReplicas=" + Arrays.toString(replicas.toArray()) + ")";
+        }
+    }
+
     public static final class Broker {
         public final int id;
         public final List<EndPoint> endPoints;
@@ -129,6 +167,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     private static final String ISR_KEY_NAME = "isr";
     private static final String ZK_VERSION_KEY_NAME = "zk_version";
     private static final String REPLICAS_KEY_NAME = "replicas";
+    private static final String OFFLINE_REPLICAS_KEY_NAME = "offline_replicas";
 
     // Broker key names
     private static final String BROKER_ID_KEY_NAME = "id";
@@ -146,8 +185,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
     private final Map<TopicPartition, PartitionState> partitionStates;
     private final Set<Broker> liveBrokers;
 
-    private UpdateMetadataRequest(short version, int controllerId, int controllerEpoch, Map<TopicPartition,
-            PartitionState> partitionStates, Set<Broker> liveBrokers) {
+    private UpdateMetadataRequest(short version, int controllerId, int controllerEpoch,
+                                  Map<TopicPartition, PartitionState> partitionStates, Set<Broker> liveBrokers) {
         super(version);
         this.controllerId = controllerId;
         this.controllerEpoch = controllerEpoch;
@@ -178,9 +217,16 @@ public class UpdateMetadataRequest extends AbstractRequest {
             for (Object r : replicasArray)
                 replicas.add((Integer) r);
 
-            PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
-            partitionStates.put(new TopicPartition(topic, partition), partitionState);
+            List<Integer> offlineReplicas = new ArrayList<>();
+            if (partitionStateData.hasField(OFFLINE_REPLICAS_KEY_NAME)) {
+                Object[] offlineReplicasArray = partitionStateData.getArray(OFFLINE_REPLICAS_KEY_NAME);
+                for (Object r : offlineReplicasArray)
+                    offlineReplicas.add((Integer) r);
+            }
 
+            PartitionState partitionState =
+                new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas, offlineReplicas);
+            partitionStates.put(new TopicPartition(topic, partition), partitionState);
         }
 
         Set<Broker> liveBrokers = new HashSet<>();
@@ -245,6 +291,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
             partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
             partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion);
             partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray());
+            if (partitionStateData.hasField(OFFLINE_REPLICAS_KEY_NAME))
+              partitionStateData.set(OFFLINE_REPLICAS_KEY_NAME, partitionState.offlineReplicas.toArray());
             partitionStatesData.add(partitionStateData);
         }
         struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
@@ -286,7 +334,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         short versionId = version();
-        if (versionId <= 3)
+        if (versionId <= 4)
             return new UpdateMetadataResponse(Errors.forException(e));
         else
             throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index afebd9d..dd1c79a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -124,7 +124,7 @@ public class AbstractCoordinatorTest {
                     throw e;
                 return false;
             }
-        }, heartbeatResponse(Errors.UNKNOWN));
+        }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
 
         try {
             coordinator.ensureActiveGroup();
@@ -499,7 +499,7 @@ public class AbstractCoordinatorTest {
                     heartbeatReceived.set(true);
                 return isHeartbeatRequest;
             }
-        }, heartbeatResponse(Errors.UNKNOWN));
+        }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
         return heartbeatReceived;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index fc0ddff..3c1b411 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -621,7 +621,7 @@ public class ConsumerCoordinatorTest {
 
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN));
+        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_SERVER_ERROR));
         coordinator.joinGroupIfNeeded();
     }
 
@@ -1267,7 +1267,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // sync commit with invalid partitions should throw if we have no callback
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN)), false);
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_SERVER_ERROR)), false);
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index c0edcfd..bb3162a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1979,7 +1979,8 @@ public class FetcherTest {
                         partitionInfo.partition(),
                         partitionInfo.leader(),
                         Arrays.asList(partitionInfo.replicas()),
-                        Arrays.asList(partitionInfo.inSyncReplicas())));
+                        Arrays.asList(partitionInfo.inSyncReplicas()),
+                        Arrays.asList(partitionInfo.offlineReplicas())));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index b655743..9bac895 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1758,7 +1758,7 @@ public class TransactionManagerTest {
 
         TransactionalRequestResult abortResult = transactionManager.beginAbort();
 
-        prepareAddOffsetsToTxnResponse(Errors.UNKNOWN, consumerGroupId, pid, epoch);
+        prepareAddOffsetsToTxnResponse(Errors.UNKNOWN_SERVER_ERROR, consumerGroupId, pid, epoch);
         sender.run(time.milliseconds());  // Send AddOffsetsToTxnRequest
         assertFalse(abortResult.isCompleted());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/common/PartitionInfoTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/PartitionInfoTest.java b/clients/src/test/java/org/apache/kafka/common/PartitionInfoTest.java
index 7836023..7511d64 100644
--- a/clients/src/test/java/org/apache/kafka/common/PartitionInfoTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/PartitionInfoTest.java
@@ -20,20 +20,21 @@ import org.junit.Assert;
 import org.junit.Test;
 
 public class PartitionInfoTest {
-    
+
     @Test
     public void testToString() {
         String topic = "sample";
         int partition = 0;
         Node leader = new Node(0, "localhost", 9092);
         Node r1 = new Node(1, "localhost", 9093);
-        Node r2 = new Node(2, "localhost", 9094); 
+        Node r2 = new Node(2, "localhost", 9094);
         Node[] replicas = new Node[] {leader, r1, r2};
-        Node[] inSyncReplicas = new Node[] {leader, r1, r2};
-        PartitionInfo partitionInfo = new PartitionInfo(topic, partition, leader, replicas, inSyncReplicas);
-        
-        String expected = String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s)",
-                topic, partition, leader.idString(), "[0,1,2]", "[0,1,2]");
+        Node[] inSyncReplicas = new Node[] {leader, r1};
+        Node[] offlineReplicas = new Node[] {r2};
+        PartitionInfo partitionInfo = new PartitionInfo(topic, partition, leader, replicas, inSyncReplicas, offlineReplicas);
+
+        String expected = String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s, offlineReplicas = %s)",
+                topic, partition, leader.idString(), "[0,1,2]", "[0,1]", "[2]");
         Assert.assertEquals(expected, partitionInfo.toString());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/common/protocol/ErrorsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ErrorsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ErrorsTest.java
index 4a96ac4..e424384 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ErrorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ErrorsTest.java
@@ -74,12 +74,12 @@ public class ErrorsTest {
     @Test
     public void testForExceptionDefault() {
         Errors error = Errors.forException(new ApiException());
-        assertEquals("forException should default to unknown", Errors.UNKNOWN, error);
+        assertEquals("forException should default to unknown", Errors.UNKNOWN_SERVER_ERROR, error);
     }
 
     @Test
     public void testExceptionName() {
-        String exceptionName = Errors.UNKNOWN.exceptionName();
+        String exceptionName = Errors.UNKNOWN_SERVER_ERROR.exceptionName();
         assertEquals("org.apache.kafka.common.errors.UnknownServerException", exceptionName);
         exceptionName = Errors.NONE.exceptionName();
         assertNull(exceptionName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a3c277f..a1c2a83 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -723,10 +723,11 @@ public class RequestResponseTest {
         Node node = new Node(1, "host1", 1001);
         List<Node> replicas = asList(node);
         List<Node> isr = asList(node);
+        List<Node> offlineReplicas = asList();
 
         List<MetadataResponse.TopicMetadata> allTopicMetadata = new ArrayList<>();
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true,
-                asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr))));
+                asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr, offlineReplicas))));
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
                 Collections.<MetadataResponse.PartitionMetadata>emptyList()));
 
@@ -807,18 +808,17 @@ public class RequestResponseTest {
         List<Integer> isr = asList(1, 2);
         List<Integer> replicas = asList(1, 2, 3, 4);
         partitionStates.put(new TopicPartition("topic5", 105),
-                new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas));
+                new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas, false));
         partitionStates.put(new TopicPartition("topic5", 1),
-                new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas));
+                new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas, false));
         partitionStates.put(new TopicPartition("topic20", 1),
-                new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas));
+                new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas, false));
 
         Set<Node> leaders = Utils.mkSet(
                 new Node(0, "test0", 1223),
                 new Node(1, "test1", 1223)
         );
-
-        return new LeaderAndIsrRequest.Builder(1, 10, partitionStates, leaders).build();
+        return new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(), 1, 10, partitionStates, leaders).build();
     }
 
     private LeaderAndIsrResponse createLeaderAndIsrResponse() {
@@ -828,15 +828,16 @@ public class RequestResponseTest {
     }
 
     private UpdateMetadataRequest createUpdateMetadataRequest(int version, String rack) {
-        Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
+        Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitionStates = new HashMap<>();
         List<Integer> isr = asList(1, 2);
         List<Integer> replicas = asList(1, 2, 3, 4);
+        List<Integer> offlineReplicas = asList();
         partitionStates.put(new TopicPartition("topic5", 105),
-                new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas));
+            new UpdateMetadataRequest.PartitionState(0, 2, 1, isr, 2, replicas, offlineReplicas));
         partitionStates.put(new TopicPartition("topic5", 1),
-                new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas));
+            new UpdateMetadataRequest.PartitionState(1, 1, 1, isr, 2, replicas, offlineReplicas));
         partitionStates.put(new TopicPartition("topic20", 1),
-                new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas));
+            new UpdateMetadataRequest.PartitionState(1, 0, 1, isr, 2, replicas, offlineReplicas));
 
         SecurityProtocol plaintext = SecurityProtocol.PLAINTEXT;
         List<UpdateMetadataRequest.EndPoint> endPoints1 = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 7bd626a..2baed02 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -66,7 +66,7 @@ class AdminClient(val time: Time,
       } finally {
         pendingFutures.asScala.foreach { future =>
           try {
-            future.raise(Errors.UNKNOWN)
+            future.raise(Errors.UNKNOWN_SERVER_ERROR)
           } catch {
             case _: IllegalStateException => // It is OK if the future has been completed
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 923ceb7..7de85e4 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -408,7 +408,7 @@ object AdminUtils extends Logging with AdminUtilities {
     zkUtils.pathExists(getTopicPath(topic))
 
   def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
-                        brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
+                         brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
     val allBrokers = zkUtils.getAllBrokersInCluster()
     val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
     val brokersWithRack = brokers.filter(_.rack.nonEmpty)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 62d5d12..05658dd 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -69,7 +69,10 @@ object ApiVersion {
     "0.11.0-IV1" -> KAFKA_0_11_0_IV1,
     // Introduced leader epoch fetches to the replica fetcher via KIP-101
     "0.11.0-IV2" -> KAFKA_0_11_0_IV2,
-    "0.11.0" -> KAFKA_0_11_0_IV2
+    "0.11.0" -> KAFKA_0_11_0_IV2,
+    // Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112
+    "0.11.1-IV0" -> KAFKA_0_11_1_IV0,
+    "0.11.1" -> KAFKA_0_11_1_IV0
   )
 
   private val versionPattern = "\\.".r
@@ -171,3 +174,10 @@ case object KAFKA_0_11_0_IV2 extends ApiVersion {
   val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
   val id: Int = 12
 }
+
+case object KAFKA_0_11_1_IV0 extends ApiVersion {
+  val version: String = "0.11.1-IV0"
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val id: Int = 13
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index 474d7a0..e92dc33 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -50,13 +50,26 @@ case class LeaderAndIsr(leader: Int,
   }
 }
 
-case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Seq[Int]) {
+case class LeaderAndIsrPartitionState(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Seq[Int], isNew: Boolean) {
 
   override def toString: String = {
     val partitionStateInfo = new StringBuilder
     partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
     partitionStateInfo.append(",ReplicationFactor:" + allReplicas.size + ")")
     partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
+    partitionStateInfo.append(",isNew:" + isNew + ")")
+    partitionStateInfo.toString()
+  }
+}
+
+case class MetadataPartitionState(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Seq[Int], offlineReplicas: Seq[Int]) {
+
+  override def toString: String = {
+    val partitionStateInfo = new StringBuilder
+    partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
+    partitionStateInfo.append(",ReplicationFactor:" + allReplicas.size + ")")
+    partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
+    partitionStateInfo.append(",OfflineReplicas:" + offlineReplicas.mkString(",") + ")")
     partitionStateInfo.toString()
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ebf3140..2c4767b 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -16,9 +16,8 @@
  */
 package kafka.cluster
 
-import java.io.IOException
-import java.util.concurrent.locks.ReentrantReadWriteLock
 
+import java.util.concurrent.locks.ReentrantReadWriteLock
 import com.yammer.metrics.core.Gauge
 import kafka.admin.AdminUtils
 import kafka.api.LeaderAndIsr
@@ -46,12 +45,15 @@ import scala.collection.JavaConverters._
 class Partition(val topic: String,
                 val partitionId: Int,
                 time: Time,
-                replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
+                replicaManager: ReplicaManager,
+                val isOffline: Boolean = false) extends Logging with KafkaMetricsGroup {
+
   val topicPartition = new TopicPartition(topic, partitionId)
 
-  private val localBrokerId = replicaManager.config.brokerId
-  private val logManager = replicaManager.logManager
-  private val zkUtils = replicaManager.zkUtils
+  // Do not use replicaManager if this partition is ReplicaManager.OfflinePartition
+  private val localBrokerId = if (!isOffline) replicaManager.config.brokerId else -1
+  private val logManager = if (!isOffline) replicaManager.logManager else null
+  private val zkUtils = if (!isOffline) replicaManager.zkUtils else null
   private val assignedReplicaMap = new Pool[Int, Replica]
   // The read lock is only required when multiple reads are executed and needs to be in a consistent manner
   private val leaderIsrUpdateLock = new ReentrantReadWriteLock
@@ -71,56 +73,59 @@ class Partition(val topic: String,
   private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == localBrokerId
   val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
 
-  newGauge("UnderReplicated",
-    new Gauge[Int] {
-      def value = {
-        if (isUnderReplicated) 1 else 0
-      }
-    },
-    tags
-  )
-
-  newGauge("InSyncReplicasCount",
-    new Gauge[Int] {
-      def value = {
-        if (isLeaderReplicaLocal) inSyncReplicas.size else 0
-      }
-    },
-    tags
-  )
-
-  newGauge("ReplicasCount",
-    new Gauge[Int] {
-      def value = {
-        if (isLeaderReplicaLocal) assignedReplicas.size else 0
-      }
-    },
-    tags
-  )
-
-  newGauge("LastStableOffsetLag",
-    new Gauge[Long] {
-      def value = {
-        leaderReplicaIfLocal.map { replica =>
-          replica.highWatermark.messageOffset - replica.lastStableOffset.messageOffset
-        }.getOrElse(0)
-      }
-    },
-    tags
-  )
+  // Do not create metrics if this partition is ReplicaManager.OfflinePartition
+  if (!isOffline) {
+    newGauge("UnderReplicated",
+      new Gauge[Int] {
+        def value = {
+          if (isUnderReplicated) 1 else 0
+        }
+      },
+      tags
+    )
+
+    newGauge("InSyncReplicasCount",
+      new Gauge[Int] {
+        def value = {
+          if (isLeaderReplicaLocal) inSyncReplicas.size else 0
+        }
+      },
+      tags
+    )
+
+    newGauge("ReplicasCount",
+      new Gauge[Int] {
+        def value = {
+          if (isLeaderReplicaLocal) assignedReplicas.size else 0
+        }
+      },
+      tags
+    )
+
+    newGauge("LastStableOffsetLag",
+      new Gauge[Long] {
+        def value = {
+          leaderReplicaIfLocal.map { replica =>
+            replica.highWatermark.messageOffset - replica.lastStableOffset.messageOffset
+          }.getOrElse(0)
+        }
+      },
+      tags
+    )
+  }
 
   private def isLeaderReplicaLocal: Boolean = leaderReplicaIfLocal.isDefined
 
   def isUnderReplicated: Boolean =
     isLeaderReplicaLocal && inSyncReplicas.size < assignedReplicas.size
 
-  def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
+  def getOrCreateReplica(replicaId: Int = localBrokerId, isNew: Boolean = false): Replica = {
     assignedReplicaMap.getAndMaybePut(replicaId, {
       if (isReplicaLocal(replicaId)) {
         val config = LogConfig.fromProps(logManager.defaultConfig.originals,
                                          AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
-        val log = logManager.createLog(topicPartition, config)
-        val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
+        val log = logManager.getOrCreateLog(topicPartition, config, isNew)
+        val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
         val offsetMap = checkpoint.read
         if (!offsetMap.contains(topicPartition))
           info(s"No checkpointed highwatermark is found for partition $topicPartition")
@@ -151,14 +156,8 @@ class Partition(val topic: String,
       assignedReplicaMap.clear()
       inSyncReplicas = Set.empty[Replica]
       leaderReplicaIdOpt = None
-      try {
-        logManager.asyncDelete(topicPartition)
-        removePartitionMetrics()
-      } catch {
-        case e: IOException =>
-          fatal(s"Error deleting the log for partition $topicPartition", e)
-          Exit.halt(1)
-      }
+      removePartitionMetrics()
+      logManager.asyncDelete(topicPartition)
     }
   }
 
@@ -176,7 +175,7 @@ class Partition(val topic: String,
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = partitionStateInfo.controllerEpoch
       // add replicas that are new
-      val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
+      val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet
       // remove assigned replicas that have been removed by the controller
       (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
       inSyncReplicas = newInSyncReplicas
@@ -185,7 +184,7 @@ class Partition(val topic: String,
 
       //We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
       leaderEpoch = partitionStateInfo.leaderEpoch
-      allReplicas.foreach(id => getOrCreateReplica(id))
+      allReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew))
 
       zkVersion = partitionStateInfo.zkVersion
       val isNewLeader =
@@ -230,7 +229,7 @@ class Partition(val topic: String,
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = partitionStateInfo.controllerEpoch
       // add replicas that are new
-      allReplicas.foreach(r => getOrCreateReplica(r))
+      allReplicas.foreach(r => getOrCreateReplica(r, partitionStateInfo.isNew))
       // remove assigned replicas that have been removed by the controller
       (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
       inSyncReplicas = Set.empty[Replica]
@@ -557,7 +556,7 @@ class Partition(val topic: String,
   /**
    * remove deleted log metrics
    */
-  private def removePartitionMetrics() {
+  def removePartitionMetrics() {
     removeMetric("UnderReplicated", tags)
     removeMetric("InSyncReplicasCount", tags)
     removeMetric("ReplicasCount", tags)
@@ -565,12 +564,12 @@ class Partition(val topic: String,
   }
 
   override def equals(that: Any): Boolean = that match {
-    case other: Partition => partitionId == other.partitionId && topic == other.topic
+    case other: Partition => partitionId == other.partitionId && topic == other.topic && isOffline == other.isOffline
     case _ => false
   }
 
   override def hashCode: Int =
-    31 + topic.hashCode + 17 * partitionId
+    31 + topic.hashCode + 17 * partitionId + (if (isOffline) 1 else 0)
 
   override def toString: String = {
     val partitionString = new StringBuilder

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/common/KafkaStorageException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/KafkaStorageException.scala b/core/src/main/scala/kafka/common/KafkaStorageException.scala
index 21dd583..e0ecff3 100644
--- a/core/src/main/scala/kafka/common/KafkaStorageException.scala
+++ b/core/src/main/scala/kafka/common/KafkaStorageException.scala
@@ -17,8 +17,10 @@
 package kafka.common
 
 /**
- * Kafka exception caused by real IOs.
-*/
+ * Kafka exception caused by disk-related IOException
+ * This class is deprecated and will be replaced by org.apache.kafka.common.errors.KafkaStorageException
+ */
+@Deprecated
 class KafkaStorageException(message: String, t: Throwable) extends RuntimeException(message, t) {
   def this(message: String) = this(message, null)
   def this(t: Throwable) = this("", t)


Mime
View raw message