kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-1250 Add logging to new producer.
Date Fri, 28 Feb 2014 02:06:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a810b8ecb -> f1a53b972


KAFKA-1250 Add logging to new producer.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f1a53b97
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f1a53b97
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f1a53b97

Branch: refs/heads/trunk
Commit: f1a53b972eb1f8e75db54d3272d9eb7c398e238a
Parents: a810b8e
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Thu Feb 20 20:17:01 2014 -0800
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Thu Feb 27 18:04:50 2014 -0800

----------------------------------------------------------------------
 build.gradle                                    |  1 +
 .../kafka/clients/producer/KafkaProducer.java   | 15 ++++-
 .../kafka/clients/producer/ProducerRecord.java  | 30 +++++-----
 .../clients/producer/internals/Metadata.java    |  6 ++
 .../producer/internals/RecordAccumulator.java   |  5 ++
 .../clients/producer/internals/RecordBatch.java | 23 ++++++--
 .../clients/producer/internals/Sender.java      | 53 +++++++++++++++---
 .../java/org/apache/kafka/common/Cluster.java   |  9 ++-
 .../main/java/org/apache/kafka/common/Node.java | 26 ++++-----
 .../org/apache/kafka/common/PartitionInfo.java  | 49 +++++++++++-----
 .../kafka/common/config/AbstractConfig.java     | 59 ++++++++++++++------
 .../kafka/common/metrics/JmxReporter.java       | 31 +++++-----
 .../apache/kafka/common/network/Selector.java   | 35 ++++++------
 .../kafka/common/requests/RequestHeader.java    | 30 +++++-----
 .../kafka/common/requests/RequestSend.java      | 30 +++++-----
 .../apache/kafka/common/utils/KafkaThread.java  | 33 +++++------
 .../org/apache/kafka/common/utils/Utils.java    | 27 ++++-----
 .../scala/kafka/server/ReplicaManager.scala     |  2 +-
 18 files changed, 291 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 069c2f6..29b1e0f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -310,6 +310,7 @@ project(':clients') {
   archivesBaseName = "kafka-clients"
 
   dependencies {
+    compile "org.slf4j:slf4j-api:1.7.6"
     testCompile 'com.novocode:junit-interface:0.9'
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 757f7a7..bedd2a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -45,6 +45,8 @@ import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.SystemTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A Kafka client that publishes records to the Kafka cluster.
@@ -56,6 +58,8 @@ import org.apache.kafka.common.utils.SystemTime;
  */
 public class KafkaProducer implements Producer {
 
+    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
+
     private final Partitioner partitioner;
     private final int maxRequestSize;
     private final long metadataFetchTimeoutMs;
@@ -85,6 +89,7 @@ public class KafkaProducer implements Producer {
     }
 
     private KafkaProducer(ProducerConfig config) {
+        log.trace("Starting the Kafka producer");
         this.metrics = new Metrics(new MetricConfig(),
                                    Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")),
                                    new SystemTime());
@@ -114,8 +119,10 @@ public class KafkaProducer implements Producer {
                                  config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                                  config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                                  new SystemTime());
-        this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true);
+        this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true);
         this.ioThread.start();
+        config.logUnused();
+        log.debug("Kafka producer started");
     }
 
     private static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
@@ -124,7 +131,7 @@ public class KafkaProducer implements Producer {
             if (url != null && url.length() > 0) {
                 String[] pieces = url.split(":");
                 if (pieces.length != 2)
-                    throw new ConfigException("Invalid url in metadata.broker.list: " + url);
+                    throw new ConfigException("Invalid url in " + ProducerConfig.BROKER_LIST_CONFIG + ": " + url);
                 try {
                     InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
                     if (address.isUnresolved())
@@ -215,12 +222,14 @@ public class KafkaProducer implements Producer {
             int partition = partitioner.partition(record, cluster);
             ensureValidSize(record.key(), record.value());
             TopicPartition tp = new TopicPartition(record.topic(), partition);
+            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
             FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
             this.sender.wakeup();
             return future;
             // For API exceptions return them in the future;
             // for other exceptions throw directly
         } catch (ApiException e) {
+            log.debug("Exception occurred during message send:", e);
             if (callback != null)
                 callback.onCompletion(null, e);
             return new FutureFailure(e);
@@ -260,6 +269,7 @@ public class KafkaProducer implements Producer {
      */
     @Override
     public void close() {
+        log.trace("Closing the Kafka producer.");
         this.sender.initiateClose();
         try {
             this.ioThread.join();
@@ -267,6 +277,7 @@ public class KafkaProducer implements Producer {
             throw new KafkaException(e);
         }
         this.metrics.close();
+        log.debug("The Kafka producer has closed.");
     }
 
     private static class FutureFailure implements Future<RecordMetadata> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index 034bf33..c3181b3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer;
 
@@ -97,4 +93,10 @@ public final class ProducerRecord {
         return partition;
     }
 
+    @Override
+    public String toString() {
+        String key = this.key == null ? "null" : ("byte[" + this.key.length + "]");
+        String value = this.value == null ? "null" : ("byte[" + this.value.length + "]");
+        return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index ce23168..db6e3a1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -19,6 +19,8 @@ import java.util.Set;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A class encapsulating some of the logic around metadata.
@@ -30,6 +32,8 @@ import org.apache.kafka.common.errors.TimeoutException;
  */
 public final class Metadata {
 
+    private static final Logger log = LoggerFactory.getLogger(Metadata.class);
+
     private final long refreshBackoffMs;
     private final long metadataExpireMs;
     private long lastRefresh;
@@ -81,6 +85,7 @@ public final class Metadata {
                 topics.add(topic);
                 forceUpdate = true;
                 try {
+                    log.trace("Requesting metadata update for topic {}.", topic);
                     wait(maxWaitMs);
                 } catch (InterruptedException e) { /* this is fine, just try again */
                 }
@@ -127,6 +132,7 @@ public final class Metadata {
         this.lastRefresh = now;
         this.cluster = cluster;
         notifyAll();
+        log.debug("Updated cluster metadata to {}", cluster);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index ce5cf27..6990274 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
@@ -43,6 +45,8 @@ import org.apache.kafka.common.utils.Utils;
  */
 public final class RecordAccumulator {
 
+    private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
+
     private volatile boolean closed;
     private int drainIndex;
     private final int batchSize;
@@ -126,6 +130,7 @@ public final class RecordAccumulator {
 
         // we don't have an in-progress record batch try to allocate a new batch
         int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
+        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
         ByteBuffer buffer = free.allocate(size);
         synchronized (dq) {
             RecordBatch first = dq.peekLast();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index ef8e658..c7fbf3c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -16,10 +16,11 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A batch of records that is or will be sent.
@@ -27,6 +28,9 @@ import org.apache.kafka.common.record.MemoryRecords;
  * This class is not thread safe and external synchronization must be used when modifying it
  */
 public final class RecordBatch {
+
+    private static final Logger log = LoggerFactory.getLogger(RecordBatch.class);
+
     public int recordCount = 0;
     public volatile int attempts = 0;
     public final long created;
@@ -64,11 +68,15 @@ public final class RecordBatch {
     /**
      * Complete the request
      * 
-     * @param offset The offset
+     * @param baseOffset The base offset of the messages assigned by the server
      * @param errorCode The error code or 0 if no error
      */
-    public void done(long offset, RuntimeException exception) {
-        this.produceFuture.done(topicPartition, offset, exception);
+    public void done(long baseOffset, RuntimeException exception) {
+        this.produceFuture.done(topicPartition, baseOffset, exception);
+        log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
+                  topicPartition,
+                  baseOffset,
+                  exception);
         // execute callbacks
         for (int i = 0; i < this.thunks.size(); i++) {
             try {
@@ -78,7 +86,7 @@ public final class RecordBatch {
                 else
                     thunk.callback.onCompletion(null, exception);
             } catch (Exception e) {
-                e.printStackTrace();
+                log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e);
             }
         }
     }
@@ -95,4 +103,9 @@ public final class RecordBatch {
             this.future = future;
         }
     }
+
+    @Override
+    public String toString() {
+        return "RecordBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")";
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 541c5e1..7942623 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -45,6 +45,8 @@ import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
@@ -52,6 +54,8 @@ import org.apache.kafka.common.utils.Utils;
  */
 public class Sender implements Runnable {
 
+    private static final Logger log = LoggerFactory.getLogger(Sender.class);
+
     /* the state of each nodes connection */
     private final NodeStates nodeStates;
 
@@ -138,15 +142,19 @@ public class Sender implements Runnable {
      * The main run loop for the sender thread
      */
     public void run() {
+        log.trace("Starting Kafka producer I/O thread.");
+
         // main loop, runs until close is called
         while (running) {
             try {
                 run(time.milliseconds());
             } catch (Exception e) {
-                e.printStackTrace();
+                log.error("Uncaught error in kafka producer I/O thread: ", e);
             }
         }
 
+        log.trace("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
+
         // okay we stopped accepting requests but there may still be
         // requests in the accumulator or waiting for acknowledgment,
         // wait until these are completed.
@@ -155,12 +163,14 @@ public class Sender implements Runnable {
             try {
                 unsent = run(time.milliseconds());
             } catch (Exception e) {
-                e.printStackTrace();
+                log.error("Uncaught error in kafka producer I/O thread: ", e);
             }
         } while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0);
 
         // close all the connections
         this.selector.close();
+
+        log.trace("Shutdown of Kafka producer I/O thread has completed.");
     }
 
     /**
@@ -184,6 +194,13 @@ public class Sender implements Runnable {
         // create produce requests
         List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize);
         List<InFlightRequest> requests = collate(cluster, batches);
+
+        if (ready.size() > 0) {
+            log.trace("Partitions with complete batches: {}", ready);
+            log.trace("Partitions ready to initiate a request: {}", sendable);
+            log.trace("Created {} requests: {}", requests.size(), requests);
+        }
+
         for (int i = 0; i < requests.size(); i++) {
             InFlightRequest request = requests.get(i);
             this.inFlightRequests.add(request);
@@ -194,7 +211,7 @@ public class Sender implements Runnable {
         try {
             this.selector.poll(100L, sends);
         } catch (IOException e) {
-            e.printStackTrace();
+            log.error("Unexpected error during I/O in producer network thread", e);
         }
 
         // handle responses, connections, and disconnections
@@ -218,8 +235,10 @@ public class Sender implements Runnable {
             return;
 
         if (nodeStates.isConnected(node.id())) {
+            Set<String> topics = metadata.topics();
+            log.debug("Sending metadata update request for topics {} to node {}", topics, node.id());
             this.metadataFetchInProgress = true;
-            InFlightRequest request = metadataRequest(node.id(), metadata.topics());
+            InFlightRequest request = metadataRequest(node.id(), topics);
             sends.add(request.request);
             this.inFlightRequests.add(request);
         } else if (nodeStates.canConnect(node.id(), now)) {
@@ -308,6 +327,7 @@ public class Sender implements Runnable {
      */
     private void initiateConnect(Node node, long now) {
         try {
+            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
             selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
             this.nodeStates.connecting(node.id(), now);
         } catch (IOException e) {
@@ -315,6 +335,7 @@ public class Sender implements Runnable {
             nodeStates.disconnected(node.id());
             /* maybe the problem is our metadata, update it */
             metadata.forceUpdate();
+            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
         }
     }
 
@@ -325,6 +346,7 @@ public class Sender implements Runnable {
         // clear out the in-flight requests for the disconnected broker
         for (int node : disconnects) {
             nodeStates.disconnected(node);
+            log.debug("Node {} disconnected.", node);
             for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
                 if (request.batches != null) {
                     for (RecordBatch batch : request.batches.values()) {
@@ -347,8 +369,10 @@ public class Sender implements Runnable {
      * Record any connections that completed in our node state
      */
     private void handleConnects(List<Integer> connects) {
-        for (Integer id : connects)
+        for (Integer id : connects) {
+            log.debug("Completed connection to node {}", id);
             this.nodeStates.connected(id);
+        }
     }
 
     /**
@@ -359,6 +383,7 @@ public class Sender implements Runnable {
         for (NetworkSend send : sends) {
             Deque<InFlightRequest> requests = this.inFlightRequests.requestQueue(send.destination());
             InFlightRequest request = requests.peekFirst();
+            log.trace("Completed send of request to node {}: {}", request.request.destination(), request.request);
             if (!request.expectResponse) {
                 requests.pollFirst();
                 if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) {
@@ -382,12 +407,16 @@ public class Sender implements Runnable {
             short apiKey = req.request.header().apiKey();
             Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
             correlate(req.request.header(), header);
-            if (req.request.header().apiKey() == ApiKeys.PRODUCE.id)
+            if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) {
+                log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId());
                 handleProduceResponse(req, body, now);
-            else if (req.request.header().apiKey() == ApiKeys.METADATA.id)
+            } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) {
+                log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header()
+                                                                                                                        .correlationId());
                 handleMetadataResponse(body, now);
-            else
+            } else {
                 throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey());
+            }
         }
     }
 
@@ -399,6 +428,8 @@ public class Sender implements Runnable {
         // created which means we will get errors and no nodes until it exists
         if (cluster.nodes().size() > 0)
             this.metadata.update(cluster, now);
+        else
+            log.trace("Ignoring empty metadata response.");
     }
 
     /**
@@ -422,6 +453,7 @@ public class Sender implements Runnable {
                 RecordBatch batch = request.batches.get(new TopicPartition(topic, partition));
                 if (canRetry(batch, error)) {
                     // retry
+                    log.warn("Got error for topic-partition {}, retrying. Error: {}", topic, partition, error);
                     this.accumulator.reenqueue(batch, now);
                 } else {
                     // tell the user the result of their request
@@ -620,6 +652,11 @@ public class Sender implements Runnable {
             this.request = request;
             this.expectResponse = expectResponse;
         }
+
+        @Override
+        public String toString() {
+            return "InFlightRequest(expectResponse=" + expectResponse + ", batches=" + batches + ", request=" + request + ")";
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 5caaaae..426bd1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -73,9 +73,9 @@ public final class Cluster {
      */
     public static Cluster bootstrap(List<InetSocketAddress> addresses) {
         List<Node> nodes = new ArrayList<Node>();
-        int nodeId = Integer.MIN_VALUE;
+        int nodeId = -1;
         for (InetSocketAddress address : addresses)
-            nodes.add(new Node(nodeId++, address.getHostName(), address.getPort()));
+            nodes.add(new Node(nodeId--, address.getHostName(), address.getPort()));
         return new Cluster(nodes, new ArrayList<PartitionInfo>(0));
     }
 
@@ -117,4 +117,9 @@ public final class Cluster {
         return this.partitionsByTopic.get(topic);
     }
 
+    @Override
+    public String toString() {
+        return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/common/Node.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java
index 4197e50..0e47ff3 100644
--- a/clients/src/main/java/org/apache/kafka/common/Node.java
+++ b/clients/src/main/java/org/apache/kafka/common/Node.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common;
 
@@ -86,7 +82,7 @@ public class Node {
 
     @Override
     public String toString() {
-        return "Node(" + id + ", " + host + ", " + port + ")";
+        return "Node(" + (id < 0 ? "" : id + ", ") + host + ", " + port + ")";
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
index 08d66f1..b15aa2c 100644
--- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common;
 
@@ -71,4 +67,29 @@ public class PartitionInfo {
         return inSyncReplicas;
     }
 
+    @Override
+    public String toString() {
+        return String.format("Partition(topic = %s, partition = %d, leader = %d, replicas = %s, isr = %s",
+                             topic,
+                             partition,
+                             leader.id(),
+                             fmtNodeIds(replicas),
+                             fmtNodeIds(inSyncReplicas));
+    }
+
+    /* Extract the node ids from each item in the array and format for display */
+    private String fmtNodeIds(Node[] nodes) {
+        StringBuilder b = new StringBuilder("[");
+        for (int i = 0; i < nodes.length - 1; i++) {
+            b.append(Integer.toString(nodes[i].id()));
+            b.append(',');
+        }
+        if (nodes.length > 0) {
+            b.append(Integer.toString(nodes[nodes.length - 1].id()));
+            b.append(',');
+        }
+        b.append("]");
+        return b.toString();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index c3148e5..c989e25 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.config;
 
@@ -25,7 +21,8 @@ import java.util.Set;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Utils;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A convenient base class for configurations to extend.
@@ -34,10 +31,17 @@ import org.apache.kafka.common.utils.Utils;
  */
 public class AbstractConfig {
 
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    /* configs for which values have been requested, used to detect unused configs */
     private final Set<String> used;
-    private final Map<String, Object> values;
+
+    /* the original values passed in by the user */
     private final Map<String, ?> originals;
 
+    /* the parsed values */
+    private final Map<String, Object> values;
+
     @SuppressWarnings("unchecked")
     public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
         /* check that all the keys are really strings */
@@ -47,6 +51,7 @@ public class AbstractConfig {
         this.originals = (Map<String, ?>) originals;
         this.values = definition.parse(this.originals);
         this.used = Collections.synchronizedSet(new HashSet<String>());
+        logAll();
     }
 
     protected Object get(String key) {
@@ -83,10 +88,30 @@ public class AbstractConfig {
 
     public Set<String> unused() {
         Set<String> keys = new HashSet<String>(originals.keySet());
-        keys.remove(used);
+        keys.removeAll(used);
         return keys;
     }
 
+    private void logAll() {
+        StringBuilder b = new StringBuilder();
+        b.append(getClass().getSimpleName());
+        b.append(" values: ");
+        b.append(Utils.NL);
+        for (Map.Entry<String, Object> entry : this.values.entrySet()) {
+            b.append('\t');
+            b.append(entry.getKey());
+            b.append(" = ");
+            b.append(entry.getValue());
+            b.append(Utils.NL);
+        }
+        log.info(b.toString());
+    }
+
+    public void logUnused() {
+        for (String key : unused())
+            log.warn("The configuration {} = {} was supplied but isn't a known config.", key, this.values.get(key));
+    }
+
     /**
      * Get a configured instance of the give class specified by the given configuration key. If the object implements
      * Configurable configure it using the configuration.

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index e08c349..c867c8d 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics;
 
@@ -36,13 +32,16 @@ import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
 import org.apache.kafka.common.KafkaException;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Register metrics in JMX as dynamic mbeans based on the metric names
  */
 public class JmxReporter implements MetricsReporter {
 
+    private static final Logger log = LoggerFactory.getLogger(JmxReporter.class);
+
     private final String prefix;
     private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
 
@@ -160,7 +159,7 @@ public class JmxReporter implements MetricsReporter {
                     list.add(new Attribute(name, getAttribute(name)));
                 return list;
             } catch (Exception e) {
-                e.printStackTrace();
+                log.error("Error getting JMX attribute: ", e);
                 return new AttributeList();
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 678bfcc..f83189d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A selector interface for doing non-blocking multi-connection network I/O.
@@ -58,6 +60,8 @@ import org.apache.kafka.common.KafkaException;
  */
 public class Selector implements Selectable {
 
+    private static final Logger log = LoggerFactory.getLogger(Selector.class);
+
     private final java.nio.channels.Selector selector;
     private final Map<Integer, SelectionKey> keys;
     private final List<NetworkSend> completedSends;
@@ -140,17 +144,12 @@ public class Selector implements Selectable {
      */
     @Override
     public void close() {
-        for (SelectionKey key : this.selector.keys()) {
-            try {
-                close(key);
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
+        for (SelectionKey key : this.selector.keys())
+            close(key);
         try {
             this.selector.close();
         } catch (IOException e) {
-            e.printStackTrace();
+            log.error("Exception closing selector:", e);
         }
     }
 
@@ -201,9 +200,7 @@ public class Selector implements Selectable {
                 Transmissions transmissions = transmissions(key);
                 SocketChannel channel = channel(key);
                 try {
-                    /*
-                     * complete any connections that have finished their handshake
-                     */
+                    /* complete any connections that have finished their handshake */
                     if (key.isConnectable()) {
                         channel.finishConnect();
                         key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
@@ -222,9 +219,7 @@ public class Selector implements Selectable {
                         }
                     }
 
-                    /*
-                     * write to any sockets that have space in their buffer and for which we have data
-                     */
+                    /* write to any sockets that have space in their buffer and for which we have data */
                     if (key.isWritable()) {
                         transmissions.send.writeTo(channel);
                         if (transmissions.send.remaining() <= 0) {
@@ -238,7 +233,7 @@ public class Selector implements Selectable {
                     if (!key.isValid())
                         close(key);
                 } catch (IOException e) {
-                    e.printStackTrace();
+                    log.error("Error in I/O: ", e);
                     close(key);
                 }
             }
@@ -294,7 +289,7 @@ public class Selector implements Selectable {
     /**
      * Begin closing this connection
      */
-    private void close(SelectionKey key) throws IOException {
+    private void close(SelectionKey key) {
         SocketChannel channel = channel(key);
         Transmissions trans = transmissions(key);
         if (trans != null) {
@@ -305,8 +300,12 @@ public class Selector implements Selectable {
         }
         key.attach(null);
         key.cancel();
-        channel.socket().close();
-        channel.close();
+        try {
+            channel.socket().close();
+            channel.close();
+        } catch (IOException e) {
+            log.error("Exception closing connection to node {}:", trans.id, e);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index 457abb1..66cc2fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.requests;
 
@@ -25,7 +21,6 @@ import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Struct;
 
-
 /**
  * The header for a request in the Kafka protocol
  */
@@ -82,4 +77,9 @@ public class RequestHeader {
     public int sizeOf() {
         return header.sizeOf();
     }
+
+    @Override
+    public String toString() {
+        return header.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
index c5e9020..27cbf39 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.requests;
 
@@ -21,7 +17,6 @@ import java.nio.ByteBuffer;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.protocol.types.Struct;
 
-
 /**
  * A send object for a kafka request
  */
@@ -52,4 +47,9 @@ public class RequestSend extends NetworkSend {
         return body;
     }
 
+    @Override
+    public String toString() {
+        return "RequestSend(header=" + header.toString() + ", body=" + body.toString() + ")";
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
index 9ff793f..57247c8 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
@@ -1,32 +1,33 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.utils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A wrapper for Thread that sets things up nicely
  */
 public class KafkaThread extends Thread {
 
-    public KafkaThread(String name, Runnable runnable, boolean daemon) {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    public KafkaThread(final String name, Runnable runnable, boolean daemon) {
         super(runnable, name);
         setDaemon(daemon);
         setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             public void uncaughtException(Thread t, Throwable e) {
-                e.printStackTrace();
+                log.error("Uncaught exception in " + name + ": ", e);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 9c34e7d..0c6b365 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.utils;
 
@@ -21,9 +17,10 @@ import java.nio.ByteBuffer;
 
 import org.apache.kafka.common.KafkaException;
 
-
 public class Utils {
 
+    public static String NL = System.getProperty("line.separator");
+
     /**
      * Turn the given UTF8 byte array into a string
      * 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1a53b97/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3dd562c..fb759d9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -474,6 +474,6 @@ class ReplicaManager(val config: KafkaConfig,
     info("Shut down")
     replicaFetcherManager.shutdown()
     checkpointHighWatermarks()
-    info("Shutted down completely")
+    info("Shut down completely")
   }
 }


Mime
View raw message