kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Improvements on Streams log4j
Date Wed, 22 Mar 2017 21:33:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 763ea5aad -> 57278aa82


MINOR: Improvements on Streams log4j

1. add thread id as prefix in state directory classes; also added logs for lock activities.
2. add logging for task creation / suspension.
3. add more information in rebalance listener logging.
4. add restored number of records into changlog reader.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Eno Thereska, Damian Guy, Ewen Cheslack-Postava

Closes #2702 from guozhangwang/KMinor-streams-task-creation-log4j-improvements


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57278aa8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57278aa8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57278aa8

Branch: refs/heads/trunk
Commit: 57278aa82da5dc9d040eb3dcf4a182e0731a621b
Parents: 763ea5a
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Mar 22 14:33:34 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Mar 22 14:33:34 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |   7 +-
 .../processor/internals/GlobalStreamThread.java |  11 +-
 .../internals/ProcessorStateManager.java        |   2 +
 .../processor/internals/StateDirectory.java     |  53 ++++++----
 .../processor/internals/StateRestorer.java      |  25 ++++-
 .../internals/StoreChangelogReader.java         |  39 +++++--
 .../streams/processor/internals/StreamTask.java |   4 +-
 .../processor/internals/StreamThread.java       | 101 ++++++++++++-------
 .../streams/state/internals/ThreadCache.java    |   2 +-
 .../internals/GlobalStreamThreadTest.java       |   4 +-
 .../processor/internals/StateRestorerTest.java  |  15 +++
 11 files changed, 179 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b23d244..2c116d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -344,13 +344,14 @@ public class KafkaStreams {
 
 
         if (globalTaskTopology != null) {
+            final String globalThreadId = clientId + "-GlobalStreamThread";
             globalStreamThread = new GlobalStreamThread(globalTaskTopology,
                                                         config,
                                                         clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId
+ "-global")),
-                                                        new StateDirectory(applicationId,
config.getString(StreamsConfig.STATE_DIR_CONFIG), time),
+                                                        new StateDirectory(applicationId,
globalThreadId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time),
                                                         metrics,
                                                         time,
-                                                        clientId);
+                                                        globalThreadId);
         }
 
         for (int i = 0; i < threads.length; i++) {
@@ -568,7 +569,7 @@ public class KafkaStreams {
             localApplicationDir,
             appId);
 
-        final StateDirectory stateDirectory = new StateDirectory(appId, stateDir, Time.SYSTEM);
+        final StateDirectory stateDirectory = new StateDirectory(appId, "cleanup", stateDir,
Time.SYSTEM);
         stateDirectory.cleanRemovedTasks(0);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 8745655..36a248e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -41,6 +41,7 @@ import java.util.Map;
 public class GlobalStreamThread extends Thread {
 
     private static final Logger log = LoggerFactory.getLogger(GlobalStreamThread.class);
+
     private final StreamsConfig config;
     private final Consumer<byte[], byte[]> consumer;
     private final StateDirectory stateDirectory;
@@ -57,17 +58,15 @@ public class GlobalStreamThread extends Thread {
                               final StateDirectory stateDirectory,
                               final Metrics metrics,
                               final Time time,
-                              final String clientId
-    ) {
-        super("GlobalStreamThread");
-        this.topology = topology;
+                              final String threadClientId) {
+        super(threadClientId);
+        this.time = time;
         this.config = config;
+        this.topology = topology;
         this.consumer = globalConsumer;
         this.stateDirectory = stateDirectory;
-        this.time = time;
         long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)
/
                 (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1));
-        final String threadClientId = clientId + "-" + getName();
         this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id",
threadClientId));
         this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 840b419..0e48ddd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -100,6 +100,8 @@ public class ProcessorStateManager implements StateManager {
         // load the checkpoint information
         checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
         this.checkpointedOffsets = new HashMap<>(checkpoint.read());
+
+        log.info("{} Created state store manager for task {} with the acquired state dir
lock", logPrefix, taskId);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 771bb61..85908e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -45,6 +45,7 @@ public class StateDirectory {
     private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
 
     private final File stateDir;
+    private final String logPrefix;
     private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
     private final HashMap<TaskId, FileLock> locks = new HashMap<>();
     private final Time time;
@@ -53,7 +54,12 @@ public class StateDirectory {
     private FileLock globalStateLock;
 
     public StateDirectory(final String applicationId, final String stateDirConfig, final
Time time) {
+        this(applicationId, "", stateDirConfig, time);
+    }
+
+    public StateDirectory(final String applicationId, final String threadId, final String
stateDirConfig, final Time time) {
         this.time = time;
+        this.logPrefix = String.format("stream-thread [%s]", threadId);
         final File baseDir = new File(stateDirConfig);
         if (!baseDir.exists() && !baseDir.mkdirs()) {
             throw new ProcessorStateException(String.format("state directory [%s] doesn't
exist and couldn't be created",
@@ -64,7 +70,6 @@ public class StateDirectory {
             throw new ProcessorStateException(String.format("state directory [%s] doesn't
exist and couldn't be created",
                                                             stateDir.getPath()));
         }
-
     }
 
     /**
@@ -100,6 +105,7 @@ public class StateDirectory {
     boolean lock(final TaskId taskId, int retry) throws IOException {
         // we already have the lock so bail out here
         if (locks.containsKey(taskId)) {
+            log.trace("{} Found cached state dir lock for task {}", logPrefix, taskId);
             return true;
         }
         final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
@@ -118,12 +124,15 @@ public class StateDirectory {
         final FileLock lock = tryLock(retry, channel);
         if (lock != null) {
             locks.put(taskId, lock);
+
+            log.debug("{} Acquired state dir lock for task {}", logPrefix, taskId);
         }
         return lock != null;
     }
 
     boolean lockGlobalState(final int retry) throws IOException {
         if (globalStateLock != null) {
+            log.trace("{} Found cached state dir lock for the global task", logPrefix);
             return true;
         }
 
@@ -144,6 +153,9 @@ public class StateDirectory {
         }
         globalStateChannel = channel;
         globalStateLock = fileLock;
+
+        log.debug("{} Acquired global state dir lock", logPrefix);
+
         return true;
     }
 
@@ -155,24 +167,10 @@ public class StateDirectory {
         globalStateChannel.close();
         globalStateLock = null;
         globalStateChannel = null;
-    }
 
-    private FileLock tryLock(int retry, final FileChannel channel) throws IOException {
-        FileLock lock = tryAcquireLock(channel);
-        while (lock == null && retry > 0) {
-            try {
-                Thread.sleep(200);
-            } catch (Exception ex) {
-                // do nothing
-            }
-            retry--;
-            lock = tryAcquireLock(channel);
-        }
-        return lock;
+        log.debug("{} Released global state dir lock", logPrefix);
     }
 
-
-
     /**
      * Unlock the state directory for the given {@link TaskId}
      * @param taskId
@@ -182,6 +180,9 @@ public class StateDirectory {
         final FileLock lock = locks.remove(taskId);
         if (lock != null) {
             lock.release();
+
+            log.debug("{} Released state dir lock for task {}", logPrefix, taskId);
+
             final FileChannel fileChannel = channels.remove(taskId);
             if (fileChannel != null) {
                 fileChannel.close();
@@ -209,19 +210,19 @@ public class StateDirectory {
                 try {
                     if (lock(id, 0)) {
                         if (time.milliseconds() > taskDir.lastModified() + cleanupDelayMs)
{
-                            log.info("Deleting obsolete state directory {} for task {} as
cleanup delay of {} ms has passed", dirName, id, cleanupDelayMs);
+                            log.info("{} Deleting obsolete state directory {} for task {}
as cleanup delay of {} ms has passed", logPrefix, dirName, id, cleanupDelayMs);
                             Utils.delete(taskDir);
                         }
                     }
                 } catch (OverlappingFileLockException e) {
                     // locked by another thread
                 } catch (IOException e) {
-                    log.error("Failed to lock the state directory due to an unexpected exception",
e);
+                    log.error("{} Failed to lock the state directory due to an unexpected
exception", logPrefix, e);
                 } finally {
                     try {
                         unlock(id);
                     } catch (IOException e) {
-                        log.error("Failed to release the state directory lock");
+                        log.error("{} Failed to release the state directory lock", logPrefix);
                     }
                 }
             }
@@ -243,6 +244,20 @@ public class StateDirectory {
         });
     }
 
+    private FileLock tryLock(int retry, final FileChannel channel) throws IOException {
+        FileLock lock = tryAcquireLock(channel);
+        while (lock == null && retry > 0) {
+            try {
+                Thread.sleep(200);
+            } catch (Exception ex) {
+                // do nothing
+            }
+            retry--;
+            lock = tryAcquireLock(channel);
+        }
+        return lock;
+    }
+
     private FileChannel getOrCreateFileChannel(final TaskId taskId, final Path lockPath)
throws IOException {
         if (!channels.containsKey(taskId)) {
             channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE));

http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 4edd71c..79bfd1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -21,12 +21,15 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 
 public class StateRestorer {
     static final int NO_CHECKPOINT = -1;
-    private final TopicPartition partition;
-    private final StateRestoreCallback stateRestoreCallback;
+
     private final Long checkpoint;
     private final long offsetLimit;
     private final boolean persistent;
+    private final TopicPartition partition;
+    private final StateRestoreCallback stateRestoreCallback;
+
     private long restoredOffset;
+    private long startingOffset;
 
     StateRestorer(final TopicPartition partition,
                   final StateRestoreCallback stateRestoreCallback,
@@ -44,15 +47,15 @@ public class StateRestorer {
         return partition;
     }
 
-    public long checkpoint() {
+    long checkpoint() {
         return checkpoint == null ? NO_CHECKPOINT : checkpoint;
     }
 
-    public void restore(final byte[] key, final byte[] value) {
+    void restore(final byte[] key, final byte[] value) {
         stateRestoreCallback.restore(key, value);
     }
 
-    public boolean isPersistent() {
+    boolean isPersistent() {
         return persistent;
     }
 
@@ -60,6 +63,14 @@ public class StateRestorer {
         this.restoredOffset = Math.min(offsetLimit, restoredOffset);
     }
 
+    void setStartingOffset(final long startingOffset) {
+        this.startingOffset = Math.min(offsetLimit, startingOffset);
+    }
+
+    long startingOffset() {
+        return startingOffset;
+    }
+
     boolean hasCompleted(final long recordOffset, final long endOffset) {
         return endOffset == 0 || recordOffset >= readTo(endOffset);
     }
@@ -68,6 +79,10 @@ public class StateRestorer {
         return restoredOffset;
     }
 
+    long restoredNumRecords() {
+        return restoredOffset - startingOffset;
+    }
+
     long offsetLimit() {
         return offsetLimit;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 0afd6c9..8639382 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -39,18 +39,23 @@ public class StoreChangelogReader implements ChangelogReader {
     private static final Logger log = LoggerFactory.getLogger(StoreChangelogReader.class);
 
     private final Consumer<byte[], byte[]> consumer;
+    private final String logPrefix;
     private final Time time;
     private final long partitionValidationTimeoutMs;
     private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
 
-
-    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, final Time
time, final long partitionValidationTimeoutMs) {
-        this.consumer = consumer;
+    public StoreChangelogReader(final String threadId, final Consumer<byte[], byte[]>
consumer, final Time time, final long partitionValidationTimeoutMs) {
         this.time = time;
+        this.consumer = consumer;
         this.partitionValidationTimeoutMs = partitionValidationTimeoutMs;
+
+        this.logPrefix = String.format("stream-thread [%s]", threadId);
     }
 
+    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, final Time
time, final long partitionValidationTimeoutMs) {
+        this("", consumer, time, partitionValidationTimeoutMs);
+    }
 
     @Override
     public void validatePartitionExists(final TopicPartition topicPartition, final String
storeName) {
@@ -60,7 +65,7 @@ public class StoreChangelogReader implements ChangelogReader {
             try {
                 partitionInfo.putAll(consumer.listTopics());
             } catch (final TimeoutException e) {
-                log.warn("Could not list topics so will fall back to partition by partition
fetching");
+                log.warn("{} Could not list topics so will fall back to partition by partition
fetching", logPrefix);
             }
         }
 
@@ -81,7 +86,7 @@ public class StoreChangelogReader implements ChangelogReader {
             throw new StreamsException(String.format("Store %s's change log (%s) does not
contain partition %s",
                                                      storeName, topicPartition.topic(), topicPartition.partition()));
         }
-        log.debug("Took {} ms to validate that partition {} exists", time.milliseconds()
- start, topicPartition);
+        log.debug("{} Took {} ms to validate that partition {} exists", logPrefix, time.milliseconds()
- start, topicPartition);
     }
 
     @Override
@@ -99,7 +104,6 @@ public class StoreChangelogReader implements ChangelogReader {
             }
             final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(stateRestorers.keySet());
 
-
             // remove any partitions where we already have all of the data
             final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
             for (final Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet())
{
@@ -113,6 +117,8 @@ public class StoreChangelogReader implements ChangelogReader {
                 }
             }
 
+            log.info("{} Starting restoring state stores from changelog topics {}", logPrefix,
needsRestoring.keySet());
+
             consumer.assign(needsRestoring.keySet());
 
             for (final StateRestorer restorer : needsRestoring.values()) {
@@ -127,6 +133,11 @@ public class StoreChangelogReader implements ChangelogReader {
                                       consumer.position(restorer.partition()),
                                       endOffsets.get(restorer.partition()));
                 }
+                // TODO: each consumer.position() call after seekToBeginning will cause
+                // a blocking round trip to reset the position for that partition; we should
+                // batch them into a single round trip to reset for all necessary partitions
+
+                restorer.setStartingOffset(consumer.position(restorer.partition()));
             }
 
             final Set<TopicPartition> partitions = new HashSet<>(needsRestoring.keySet());
@@ -139,12 +150,13 @@ public class StoreChangelogReader implements ChangelogReader {
             }
         } finally {
             consumer.assign(Collections.<TopicPartition>emptyList());
-            log.debug("Took {} ms to restore active state", time.milliseconds() - start);
+            log.debug("{} Took {} ms to restore all active states", logPrefix, time.milliseconds()
- start);
         }
     }
 
     private void logRestoreOffsets(final TopicPartition partition, final long checkpoint,
final Long aLong) {
-        log.debug("restoring partition {} from offset {} to endOffset {}",
+        log.debug("{} Restoring partition {} from offset {} to endOffset {}",
+                  logPrefix,
                   partition,
                   checkpoint,
                   aLong);
@@ -177,7 +189,16 @@ public class StoreChangelogReader implements ChangelogReader {
                                       endOffset,
                                       pos));
             }
+
             restorer.setRestoredOffset(pos);
+
+            log.debug("{} Completed restoring state from changelog {} with {} records ranging
from offset {} to {}",
+                    logPrefix,
+                    topicPartition,
+                    restorer.restoredNumRecords(),
+                    restorer.startingOffset(),
+                    restorer.restoredOffset());
+
             partitionIterator.remove();
         }
     }
@@ -209,6 +230,4 @@ public class StoreChangelogReader implements ChangelogReader {
         return false;
 
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 609878a..7bd4be4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -68,7 +68,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
     private Runnable commitDelegate = new Runnable() {
         @Override
         public void run() {
-            log.debug("{} Committing its state", logPrefix);
             // 1) flush local state
             stateMgr.flush(processorContext);
 
@@ -89,7 +88,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @param partitions            the collection of assigned {@link TopicPartition}
      * @param topology              the instance of {@link ProcessorTopology}
      * @param consumer              the instance of {@link Consumer}
-     * @param restoreConsumer       the instance of {@link Consumer} used when restoring
state
+     * @param changelogReader       the instance of {@link ChangelogReader} used for restoring
state
      * @param config                the {@link StreamsConfig} specified by the user
      * @param metrics               the {@link StreamsMetrics} created by the thread
      * @param stateDirectory        the {@link StateDirectory} created by the thread
@@ -126,7 +125,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
         this.logPrefix = String.format("task [%s]", id);
 
-
         this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
 
         // initialize the consumed offset cache

http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7cd4b93..61d7d72 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -259,7 +259,6 @@ public class StreamThread extends Thread {
         }
         this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.streamsMetrics);
 
-
         this.logPrefix = String.format("stream-thread [%s]", threadClientId);
 
         // set the producer and consumer clients
@@ -271,7 +270,7 @@ public class StreamThread extends Thread {
 
         if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals(""))
{
             originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-            log.info("{} custom offset resets specified updating configs original auto offset
reset {}", logPrefix, originalReset);
+            log.info("{} Custom offset resets specified updating configs original auto offset
reset {}", logPrefix, originalReset);
             consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
         }
 
@@ -292,7 +291,7 @@ public class StreamThread extends Thread {
         // standby ktables
         this.standbyRecords = new HashMap<>();
 
-        this.stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
time);
+        this.stateDirectory = new StateDirectory(applicationId, threadClientId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
time);
         final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
         this.rebalanceTimeoutMs =  (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
maxPollInterval, Type.INT);
         this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
@@ -693,7 +692,8 @@ public class StreamThread extends Thread {
     protected void maybeCommit(final long now) {
 
         if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
-            log.info("{} Committing all tasks because the commit interval {}ms has elapsed",
logPrefix, commitTimeMs);
+            log.info("{} Committing all active tasks {} and standby tasks {} because the
commit interval {}ms has elapsed",
+                    logPrefix, commitTimeMs, activeTasks, standbyTasks);
 
             commitAll();
             lastCommitMs = now;
@@ -717,7 +717,6 @@ public class StreamThread extends Thread {
      * Commit the states of all its tasks
      */
     private void commitAll() {
-        log.trace("{} Committing all its owned tasks", logPrefix);
         for (StreamTask task : activeTasks.values()) {
             commitOne(task);
         }
@@ -782,22 +781,6 @@ public class StreamThread extends Thread {
         return tasks;
     }
 
-    protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions)
{
-        log.info("{} Creating active task {} with assigned partitions {}", logPrefix, id,
partitions);
-
-        streamsMetrics.taskCreatedSensor.record();
-
-        final ProcessorTopology topology = builder.build(id.topicGroupId);
-        final RecordCollector recordCollector = new RecordCollectorImpl(producer, id.toString());
-        final long start = time.milliseconds();
-        try {
-            return new StreamTask(id, applicationId, partitions, topology, consumer, storeChangelogReader,
config, streamsMetrics, stateDirectory, cache, time, recordCollector);
-        } finally {
-            log.debug("{} creation of active task {} took {} ms", logPrefix, id, time.milliseconds()
- start);
-        }
-    }
-
-
     private StreamTask findMatchingSuspendedTask(final TaskId taskId, final Set<TopicPartition>
partitions) {
         if (suspendedTasks.containsKey(taskId)) {
             final StreamTask task = suspendedTasks.get(taskId);
@@ -826,7 +809,7 @@ public class StreamThread extends Thread {
             final StreamTask task = next.getValue();
             final Set<TopicPartition> assignedPartitionsForTask = newTaskAssignment.get(next.getKey());
             if (!task.partitions().equals(assignedPartitionsForTask)) {
-                log.debug("{} closing suspended non-assigned task", logPrefix);
+                log.debug("{} Closing suspended non-assigned active task {}", logPrefix,
task.id);
                 try {
                     task.close();
                     task.closeStateManager(true);
@@ -846,13 +829,13 @@ public class StreamThread extends Thread {
         while (standByTaskIterator.hasNext()) {
             final Map.Entry<TaskId, StandbyTask> suspendedTask = standByTaskIterator.next();
             if (!currentSuspendedTaskIds.contains(suspendedTask.getKey())) {
-                log.debug("{} Closing suspended non-assigned standby task {}", logPrefix,
suspendedTask.getKey());
                 final StandbyTask task = suspendedTask.getValue();
+                log.debug("{} Closing suspended non-assigned standby task {}", logPrefix,
task.id);
                 try {
                     task.close();
                     task.closeStateManager(true);
                 } catch (Exception e) {
-                    log.error("{} Failed to remove suspended task standby {}", logPrefix,
suspendedTask.getKey(), e);
+                    log.error("{} Failed to remove suspended standby task {}", logPrefix,
task.id, e);
                 } finally {
                     standByTaskIterator.remove();
                 }
@@ -860,6 +843,20 @@ public class StreamThread extends Thread {
         }
     }
 
+    protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions)
{
+        log.debug("{} Creating new active task {} with assigned partitions {}", logPrefix,
id, partitions);
+
+        streamsMetrics.taskCreatedSensor.record();
+
+        final ProcessorTopology topology = builder.build(id.topicGroupId);
+        final RecordCollector recordCollector = new RecordCollectorImpl(producer, id.toString());
+        try {
+            return new StreamTask(id, applicationId, partitions, topology, consumer, storeChangelogReader,
config, streamsMetrics, stateDirectory, cache, time, recordCollector);
+        } finally {
+            log.info("{} Created active task {} with assigned partitions {}", logPrefix,
id, partitions);
+        }
+    }
+
     private void addStreamTasks(Collection<TopicPartition> assignment, final long start)
{
         if (partitionAssignor == null)
             throw new IllegalStateException(logPrefix + " Partition assignor has not been
initialized while adding stream tasks: this should not happen.");
@@ -898,19 +895,27 @@ public class StreamThread extends Thread {
 
         // create all newly assigned tasks (guard against race condition with other thread
via backoff and retry)
         // -> other thread will call removeSuspendedTasks(); eventually
+        log.debug("{} new active tasks to be created: {}", logPrefix, newTasks);
+
         taskCreator.retryWithBackoff(newTasks, start);
     }
 
-    StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions)
{
-        log.info("{} Creating new standby task {} with assigned partitions {}", logPrefix,
id, partitions);
+    private StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions)
{
+        log.debug("{} Creating new standby task {} with assigned partitions {}", logPrefix,
id, partitions);
 
         streamsMetrics.taskCreatedSensor.record();
 
         ProcessorTopology topology = builder.build(id.topicGroupId);
 
         if (!topology.stateStores().isEmpty()) {
-            return new StandbyTask(id, applicationId, partitions, topology, consumer, storeChangelogReader,
config, streamsMetrics, stateDirectory);
+            try {
+                return new StandbyTask(id, applicationId, partitions, topology, consumer,
storeChangelogReader, config, streamsMetrics, stateDirectory);
+            } finally {
+                log.info("{} Created standby task {} with assigned partitions {}", logPrefix,
id, partitions);
+            }
         } else {
+            log.info("{} Skipped standby task {} with assigned partitions {} since it does
not have any state stores to materialize", logPrefix, id, partitions);
+
             return null;
         }
     }
@@ -942,6 +947,8 @@ public class StreamThread extends Thread {
 
         // create all newly assigned standby tasks (guard against race condition with other
thread via backoff and retry)
         // -> other thread will call removeSuspendedStandbyTasks(); eventually
+        log.debug("{} new standby tasks to be created: {}", logPrefix, newStandbyTasks);
+
         new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks, start);
 
         restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet()));
@@ -1006,7 +1013,7 @@ public class StreamThread extends Thread {
         return performOnAllTasks(new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
-                log.info("{} Closing a task {}", StreamThread.this.logPrefix, task.id());
+                log.info("{} Closing task {}", StreamThread.this.logPrefix, task.id());
                 task.close();
                 streamsMetrics.tasksClosedSensor.record();
             }
@@ -1017,7 +1024,7 @@ public class StreamThread extends Thread {
         return performOnAllTasks(new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
-                log.info("{} Closing a task's topology {}", StreamThread.this.logPrefix,
task.id());
+                log.info("{} Closing task's topology {}", StreamThread.this.logPrefix, task.id());
                 task.closeTopology();
                 streamsMetrics.tasksClosedSensor.record();
             }
@@ -1168,7 +1175,6 @@ public class StreamThread extends Thread {
 
     class TaskCreator extends AbstractTaskCreator {
         void createTask(final TaskId taskId, final Set<TopicPartition> partitions)
{
-            log.debug("{} creating new task {}", logPrefix, taskId);
             final StreamTask task = createStreamTask(taskId, partitions);
 
             activeTasks.put(taskId, task);
@@ -1187,7 +1193,6 @@ public class StreamThread extends Thread {
         }
 
         void createTask(final TaskId taskId, final Set<TopicPartition> partitions)
{
-            log.debug("{} creating new standby task {}", logPrefix, taskId);
             final StandbyTask task = createStandbyTask(taskId, partitions);
             updateStandByTaskMaps(checkpointedOffsets, taskId, partitions, task);
         }
@@ -1204,10 +1209,19 @@ public class StreamThread extends Thread {
 
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
+            log.info("{} at state {}: new partitions {} assigned at the end of consumer rebalance.\n"
+
+                            "\tassigned active tasks: {}\n" +
+                            "\tassigned standby tasks: {}\n" +
+                            "\tcurrent suspended active tasks: {}\n" +
+                            "\tcurrent suspended standby tasks: {}\n" +
+                            "\tprevious active tasks: {}",
+                    logPrefix, state, assignment,
+                    partitionAssignor.activeTasks(), partitionAssignor.standbyTasks(),
+                    suspendedTasks, suspendedStandbyTasks, prevActiveTasks);
+
             final long start = time.milliseconds();
             try {
-                log.info("{} at state {}: new partitions {} assigned at the end of consumer
rebalance.", logPrefix, state, assignment);
-                storeChangelogReader = new StoreChangelogReader(restoreConsumer, time, requestTimeOut);
+                storeChangelogReader = new StoreChangelogReader(getName(), restoreConsumer,
time, requestTimeOut);
                 setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS);
                 // do this first as we may have suspended standby tasks that
                 // will become active or vice versa
@@ -1223,14 +1237,24 @@ public class StreamThread extends Thread {
                 rebalanceException = t;
                 throw t;
             } finally {
-                log.debug("{} partition assignment took {} ms", logPrefix, time.milliseconds()
- start);
+                log.info("{} partition assignment took {} ms.\n" +
+                        "\tcurrent active tasks: {}\n" +
+                        "\tcurrent standby tasks: {}",
+                        logPrefix, time.milliseconds() - start,
+                        activeTasks, standbyTasks);
             }
         }
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
+            log.info("{} at state {}: partitions {} revoked at the beginning of consumer
rebalance.\n" +
+                            "\tcurrent assigned active tasks: {}\n" +
+                            "\tcurrent assigned standby tasks: {}\n",
+                    logPrefix, state, assignment,
+                    activeTasks, standbyTasks);
+
+            final long start = time.milliseconds();
             try {
-                log.info("{} at state {}: partitions {} revoked at the beginning of consumer
rebalance.", logPrefix, state, assignment);
                 setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED);
                 lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions
are assigned
                 // suspend active tasks
@@ -1242,6 +1266,13 @@ public class StreamThread extends Thread {
                 streamsMetadataState.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(),
partitionAssignor.clusterMetadata());
                 removeStreamTasks();
                 removeStandbyTasks();
+
+                log.info("{} partition revocation took {} ms.\n" +
+                                "\tsuspended active tasks: {}\n" +
+                                "\tsuspended standby tasks: {}\n" +
+                                "\tprevious active tasks: {}\n",
+                        logPrefix, time.milliseconds() - start,
+                        suspendedTasks, suspendedStandbyTasks, prevActiveTasks);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 5281814..45a6488 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -96,7 +96,7 @@ public class ThreadCache {
         }
         cache.flush();
 
-        log.debug("Thread {} cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}",
+        log.trace("Thread {} cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}",
                   name, puts(), gets(), evicts(), flushes());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 97372b8..30582ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -62,7 +62,7 @@ public class GlobalStreamThreadTest {
                                                     new StateDirectory("appId", TestUtils.tempDirectory().getPath(),
time),
                                                     new Metrics(),
                                                     new MockTime(),
-                                                    "client");
+                                                    "clientId");
     }
 
     @Test
@@ -93,7 +93,7 @@ public class GlobalStreamThreadTest {
                                                     new StateDirectory("appId", TestUtils.tempDirectory().getPath(),
time),
                                                     new Metrics(),
                                                     new MockTime(),
-                                                    "client");
+                                                    "clientId");
 
         try {
             globalStreamThread.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index ead19c7..a847a94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -65,5 +65,20 @@ public class StateRestorerTest {
         assertThat(restorer.restoredOffset(), equalTo(OFFSET_LIMIT));
     }
 
+    @Test
+    public void shouldSetStartingOffsetToMinOfLimitAndOffset() throws Exception {
+        restorer.setStartingOffset(20);
+        assertThat(restorer.startingOffset(), equalTo(20L));
+        restorer.setRestoredOffset(100);
+        assertThat(restorer.restoredOffset(), equalTo(OFFSET_LIMIT));
+    }
 
+    @Test
+    public void shouldReturnCorrectNumRestoredRecords() throws Exception {
+        restorer.setStartingOffset(20);
+        restorer.setRestoredOffset(40);
+        assertThat(restorer.restoredNumRecords(), equalTo(20L));
+        restorer.setRestoredOffset(100);
+        assertThat(restorer.restoredNumRecords(), equalTo(OFFSET_LIMIT - 20));
+    }
 }
\ No newline at end of file


Mime
View raw message