Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D4306200C3F for ; Wed, 22 Mar 2017 22:33:39 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D2AE5160B86; Wed, 22 Mar 2017 21:33:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 29163160B74 for ; Wed, 22 Mar 2017 22:33:38 +0100 (CET) Received: (qmail 73696 invoked by uid 500); 22 Mar 2017 21:33:37 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 73686 invoked by uid 99); 22 Mar 2017 21:33:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Mar 2017 21:33:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 30EF8DFBD3; Wed, 22 Mar 2017 21:33:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: <74011a16612a4c7eb01d5942fa3b34f1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: Improvements on Streams log4j Date: Wed, 22 Mar 2017 21:33:37 +0000 (UTC) archived-at: Wed, 22 Mar 2017 21:33:40 -0000 Repository: kafka Updated Branches: refs/heads/trunk 763ea5aad -> 57278aa82 MINOR: Improvements on Streams log4j 1. add thread id as prefix in state directory classes; also added logs for lock activities. 2. add logging for task creation / suspension. 3. add more information in rebalance listener logging. 4. add restored number of records into changlog reader. Author: Guozhang Wang Reviewers: Eno Thereska, Damian Guy, Ewen Cheslack-Postava Closes #2702 from guozhangwang/KMinor-streams-task-creation-log4j-improvements Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57278aa8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57278aa8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57278aa8 Branch: refs/heads/trunk Commit: 57278aa82da5dc9d040eb3dcf4a182e0731a621b Parents: 763ea5a Author: Guozhang Wang Authored: Wed Mar 22 14:33:34 2017 -0700 Committer: Guozhang Wang Committed: Wed Mar 22 14:33:34 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 7 +- .../processor/internals/GlobalStreamThread.java | 11 +- .../internals/ProcessorStateManager.java | 2 + .../processor/internals/StateDirectory.java | 53 ++++++---- .../processor/internals/StateRestorer.java | 25 ++++- .../internals/StoreChangelogReader.java | 39 +++++-- .../streams/processor/internals/StreamTask.java | 4 +- .../processor/internals/StreamThread.java | 101 ++++++++++++------- .../streams/state/internals/ThreadCache.java | 2 +- .../internals/GlobalStreamThreadTest.java | 4 +- .../processor/internals/StateRestorerTest.java | 15 +++ 11 files changed, 179 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index b23d244..2c116d9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -344,13 +344,14 @@ public class KafkaStreams { if (globalTaskTopology != null) { + final String globalThreadId = clientId + "-GlobalStreamThread"; globalStreamThread = new GlobalStreamThread(globalTaskTopology, config, clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")), - new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time), + new StateDirectory(applicationId, globalThreadId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time), metrics, time, - clientId); + globalThreadId); } for (int i = 0; i < threads.length; i++) { @@ -568,7 +569,7 @@ public class KafkaStreams { localApplicationDir, appId); - final StateDirectory stateDirectory = new StateDirectory(appId, stateDir, Time.SYSTEM); + final StateDirectory stateDirectory = new StateDirectory(appId, "cleanup", stateDir, Time.SYSTEM); stateDirectory.cleanRemovedTasks(0); } http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 8745655..36a248e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -41,6 +41,7 @@ import java.util.Map; public class GlobalStreamThread extends Thread { private static final Logger log = LoggerFactory.getLogger(GlobalStreamThread.class); + private final StreamsConfig config; private final Consumer consumer; private final StateDirectory stateDirectory; @@ -57,17 +58,15 @@ public class GlobalStreamThread extends Thread { final StateDirectory stateDirectory, final Metrics metrics, final Time time, - final String clientId - ) { - super("GlobalStreamThread"); - this.topology = topology; + final String threadClientId) { + super(threadClientId); + this.time = time; this.config = config; + this.topology = topology; this.consumer = globalConsumer; this.stateDirectory = stateDirectory; - this.time = time; long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) / (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1)); - final String threadClientId = clientId + "-" + getName(); this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId)); this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics); } http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 840b419..0e48ddd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -100,6 +100,8 @@ public class ProcessorStateManager implements StateManager { // load the checkpoint information checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); this.checkpointedOffsets = new HashMap<>(checkpoint.read()); + + log.info("{} Created state store manager for task {} with the acquired state dir lock", logPrefix, taskId); } http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 771bb61..85908e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -45,6 +45,7 @@ public class StateDirectory { private static final Logger log = LoggerFactory.getLogger(StateDirectory.class); private final File stateDir; + private final String logPrefix; private final HashMap channels = new HashMap<>(); private final HashMap locks = new HashMap<>(); private final Time time; @@ -53,7 +54,12 @@ public class StateDirectory { private FileLock globalStateLock; public StateDirectory(final String applicationId, final String stateDirConfig, final Time time) { + this(applicationId, "", stateDirConfig, time); + } + + public StateDirectory(final String applicationId, final String threadId, final String stateDirConfig, final Time time) { this.time = time; + this.logPrefix = String.format("stream-thread [%s]", threadId); final File baseDir = new File(stateDirConfig); if (!baseDir.exists() && !baseDir.mkdirs()) { throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", @@ -64,7 +70,6 @@ public class StateDirectory { throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath())); } - } /** @@ -100,6 +105,7 @@ public class StateDirectory { boolean lock(final TaskId taskId, int retry) throws IOException { // we already have the lock so bail out here if (locks.containsKey(taskId)) { + log.trace("{} Found cached state dir lock for task {}", logPrefix, taskId); return true; } final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); @@ -118,12 +124,15 @@ public class StateDirectory { final FileLock lock = tryLock(retry, channel); if (lock != null) { locks.put(taskId, lock); + + log.debug("{} Acquired state dir lock for task {}", logPrefix, taskId); } return lock != null; } boolean lockGlobalState(final int retry) throws IOException { if (globalStateLock != null) { + log.trace("{} Found cached state dir lock for the global task", logPrefix); return true; } @@ -144,6 +153,9 @@ public class StateDirectory { } globalStateChannel = channel; globalStateLock = fileLock; + + log.debug("{} Acquired global state dir lock", logPrefix); + return true; } @@ -155,24 +167,10 @@ public class StateDirectory { globalStateChannel.close(); globalStateLock = null; globalStateChannel = null; - } - private FileLock tryLock(int retry, final FileChannel channel) throws IOException { - FileLock lock = tryAcquireLock(channel); - while (lock == null && retry > 0) { - try { - Thread.sleep(200); - } catch (Exception ex) { - // do nothing - } - retry--; - lock = tryAcquireLock(channel); - } - return lock; + log.debug("{} Released global state dir lock", logPrefix); } - - /** * Unlock the state directory for the given {@link TaskId} * @param taskId @@ -182,6 +180,9 @@ public class StateDirectory { final FileLock lock = locks.remove(taskId); if (lock != null) { lock.release(); + + log.debug("{} Released state dir lock for task {}", logPrefix, taskId); + final FileChannel fileChannel = channels.remove(taskId); if (fileChannel != null) { fileChannel.close(); @@ -209,19 +210,19 @@ public class StateDirectory { try { if (lock(id, 0)) { if (time.milliseconds() > taskDir.lastModified() + cleanupDelayMs) { - log.info("Deleting obsolete state directory {} for task {} as cleanup delay of {} ms has passed", dirName, id, cleanupDelayMs); + log.info("{} Deleting obsolete state directory {} for task {} as cleanup delay of {} ms has passed", logPrefix, dirName, id, cleanupDelayMs); Utils.delete(taskDir); } } } catch (OverlappingFileLockException e) { // locked by another thread } catch (IOException e) { - log.error("Failed to lock the state directory due to an unexpected exception", e); + log.error("{} Failed to lock the state directory due to an unexpected exception", logPrefix, e); } finally { try { unlock(id); } catch (IOException e) { - log.error("Failed to release the state directory lock"); + log.error("{} Failed to release the state directory lock", logPrefix); } } } @@ -243,6 +244,20 @@ public class StateDirectory { }); } + private FileLock tryLock(int retry, final FileChannel channel) throws IOException { + FileLock lock = tryAcquireLock(channel); + while (lock == null && retry > 0) { + try { + Thread.sleep(200); + } catch (Exception ex) { + // do nothing + } + retry--; + lock = tryAcquireLock(channel); + } + return lock; + } + private FileChannel getOrCreateFileChannel(final TaskId taskId, final Path lockPath) throws IOException { if (!channels.containsKey(taskId)) { channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)); http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java index 4edd71c..79bfd1d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java @@ -21,12 +21,15 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; public class StateRestorer { static final int NO_CHECKPOINT = -1; - private final TopicPartition partition; - private final StateRestoreCallback stateRestoreCallback; + private final Long checkpoint; private final long offsetLimit; private final boolean persistent; + private final TopicPartition partition; + private final StateRestoreCallback stateRestoreCallback; + private long restoredOffset; + private long startingOffset; StateRestorer(final TopicPartition partition, final StateRestoreCallback stateRestoreCallback, @@ -44,15 +47,15 @@ public class StateRestorer { return partition; } - public long checkpoint() { + long checkpoint() { return checkpoint == null ? NO_CHECKPOINT : checkpoint; } - public void restore(final byte[] key, final byte[] value) { + void restore(final byte[] key, final byte[] value) { stateRestoreCallback.restore(key, value); } - public boolean isPersistent() { + boolean isPersistent() { return persistent; } @@ -60,6 +63,14 @@ public class StateRestorer { this.restoredOffset = Math.min(offsetLimit, restoredOffset); } + void setStartingOffset(final long startingOffset) { + this.startingOffset = Math.min(offsetLimit, startingOffset); + } + + long startingOffset() { + return startingOffset; + } + boolean hasCompleted(final long recordOffset, final long endOffset) { return endOffset == 0 || recordOffset >= readTo(endOffset); } @@ -68,6 +79,10 @@ public class StateRestorer { return restoredOffset; } + long restoredNumRecords() { + return restoredOffset - startingOffset; + } + long offsetLimit() { return offsetLimit; } http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 0afd6c9..8639382 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -39,18 +39,23 @@ public class StoreChangelogReader implements ChangelogReader { private static final Logger log = LoggerFactory.getLogger(StoreChangelogReader.class); private final Consumer consumer; + private final String logPrefix; private final Time time; private final long partitionValidationTimeoutMs; private final Map> partitionInfo = new HashMap<>(); private final Map stateRestorers = new HashMap<>(); - - public StoreChangelogReader(final Consumer consumer, final Time time, final long partitionValidationTimeoutMs) { - this.consumer = consumer; + public StoreChangelogReader(final String threadId, final Consumer consumer, final Time time, final long partitionValidationTimeoutMs) { this.time = time; + this.consumer = consumer; this.partitionValidationTimeoutMs = partitionValidationTimeoutMs; + + this.logPrefix = String.format("stream-thread [%s]", threadId); } + public StoreChangelogReader(final Consumer consumer, final Time time, final long partitionValidationTimeoutMs) { + this("", consumer, time, partitionValidationTimeoutMs); + } @Override public void validatePartitionExists(final TopicPartition topicPartition, final String storeName) { @@ -60,7 +65,7 @@ public class StoreChangelogReader implements ChangelogReader { try { partitionInfo.putAll(consumer.listTopics()); } catch (final TimeoutException e) { - log.warn("Could not list topics so will fall back to partition by partition fetching"); + log.warn("{} Could not list topics so will fall back to partition by partition fetching", logPrefix); } } @@ -81,7 +86,7 @@ public class StoreChangelogReader implements ChangelogReader { throw new StreamsException(String.format("Store %s's change log (%s) does not contain partition %s", storeName, topicPartition.topic(), topicPartition.partition())); } - log.debug("Took {} ms to validate that partition {} exists", time.milliseconds() - start, topicPartition); + log.debug("{} Took {} ms to validate that partition {} exists", logPrefix, time.milliseconds() - start, topicPartition); } @Override @@ -99,7 +104,6 @@ public class StoreChangelogReader implements ChangelogReader { } final Map endOffsets = consumer.endOffsets(stateRestorers.keySet()); - // remove any partitions where we already have all of the data final Map needsRestoring = new HashMap<>(); for (final Map.Entry entry : endOffsets.entrySet()) { @@ -113,6 +117,8 @@ public class StoreChangelogReader implements ChangelogReader { } } + log.info("{} Starting restoring state stores from changelog topics {}", logPrefix, needsRestoring.keySet()); + consumer.assign(needsRestoring.keySet()); for (final StateRestorer restorer : needsRestoring.values()) { @@ -127,6 +133,11 @@ public class StoreChangelogReader implements ChangelogReader { consumer.position(restorer.partition()), endOffsets.get(restorer.partition())); } + // TODO: each consumer.position() call after seekToBeginning will cause + // a blocking round trip to reset the position for that partition; we should + // batch them into a single round trip to reset for all necessary partitions + + restorer.setStartingOffset(consumer.position(restorer.partition())); } final Set partitions = new HashSet<>(needsRestoring.keySet()); @@ -139,12 +150,13 @@ public class StoreChangelogReader implements ChangelogReader { } } finally { consumer.assign(Collections.emptyList()); - log.debug("Took {} ms to restore active state", time.milliseconds() - start); + log.debug("{} Took {} ms to restore all active states", logPrefix, time.milliseconds() - start); } } private void logRestoreOffsets(final TopicPartition partition, final long checkpoint, final Long aLong) { - log.debug("restoring partition {} from offset {} to endOffset {}", + log.debug("{} Restoring partition {} from offset {} to endOffset {}", + logPrefix, partition, checkpoint, aLong); @@ -177,7 +189,16 @@ public class StoreChangelogReader implements ChangelogReader { endOffset, pos)); } + restorer.setRestoredOffset(pos); + + log.debug("{} Completed restoring state from changelog {} with {} records ranging from offset {} to {}", + logPrefix, + topicPartition, + restorer.restoredNumRecords(), + restorer.startingOffset(), + restorer.restoredOffset()); + partitionIterator.remove(); } } @@ -209,6 +230,4 @@ public class StoreChangelogReader implements ChangelogReader { return false; } - - } http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 609878a..7bd4be4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -68,7 +68,6 @@ public class StreamTask extends AbstractTask implements Punctuator { private Runnable commitDelegate = new Runnable() { @Override public void run() { - log.debug("{} Committing its state", logPrefix); // 1) flush local state stateMgr.flush(processorContext); @@ -89,7 +88,7 @@ public class StreamTask extends AbstractTask implements Punctuator { * @param partitions the collection of assigned {@link TopicPartition} * @param topology the instance of {@link ProcessorTopology} * @param consumer the instance of {@link Consumer} - * @param restoreConsumer the instance of {@link Consumer} used when restoring state + * @param changelogReader the instance of {@link ChangelogReader} used for restoring state * @param config the {@link StreamsConfig} specified by the user * @param metrics the {@link StreamsMetrics} created by the thread * @param stateDirectory the {@link StateDirectory} created by the thread @@ -126,7 +125,6 @@ public class StreamTask extends AbstractTask implements Punctuator { this.logPrefix = String.format("task [%s]", id); - this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor); // initialize the consumed offset cache http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7cd4b93..61d7d72 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -259,7 +259,6 @@ public class StreamThread extends Thread { } this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.streamsMetrics); - this.logPrefix = String.format("stream-thread [%s]", threadClientId); // set the producer and consumer clients @@ -271,7 +270,7 @@ public class StreamThread extends Thread { if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) { originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); - log.info("{} custom offset resets specified updating configs original auto offset reset {}", logPrefix, originalReset); + log.info("{} Custom offset resets specified updating configs original auto offset reset {}", logPrefix, originalReset); consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); } @@ -292,7 +291,7 @@ public class StreamThread extends Thread { // standby ktables this.standbyRecords = new HashMap<>(); - this.stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time); + this.stateDirectory = new StateDirectory(applicationId, threadClientId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time); final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); this.rebalanceTimeoutMs = (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT); this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG); @@ -693,7 +692,8 @@ public class StreamThread extends Thread { protected void maybeCommit(final long now) { if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) { - log.info("{} Committing all tasks because the commit interval {}ms has elapsed", logPrefix, commitTimeMs); + log.info("{} Committing all active tasks {} and standby tasks {} because the commit interval {}ms has elapsed", + logPrefix, commitTimeMs, activeTasks, standbyTasks); commitAll(); lastCommitMs = now; @@ -717,7 +717,6 @@ public class StreamThread extends Thread { * Commit the states of all its tasks */ private void commitAll() { - log.trace("{} Committing all its owned tasks", logPrefix); for (StreamTask task : activeTasks.values()) { commitOne(task); } @@ -782,22 +781,6 @@ public class StreamThread extends Thread { return tasks; } - protected StreamTask createStreamTask(TaskId id, Collection partitions) { - log.info("{} Creating active task {} with assigned partitions {}", logPrefix, id, partitions); - - streamsMetrics.taskCreatedSensor.record(); - - final ProcessorTopology topology = builder.build(id.topicGroupId); - final RecordCollector recordCollector = new RecordCollectorImpl(producer, id.toString()); - final long start = time.milliseconds(); - try { - return new StreamTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory, cache, time, recordCollector); - } finally { - log.debug("{} creation of active task {} took {} ms", logPrefix, id, time.milliseconds() - start); - } - } - - private StreamTask findMatchingSuspendedTask(final TaskId taskId, final Set partitions) { if (suspendedTasks.containsKey(taskId)) { final StreamTask task = suspendedTasks.get(taskId); @@ -826,7 +809,7 @@ public class StreamThread extends Thread { final StreamTask task = next.getValue(); final Set assignedPartitionsForTask = newTaskAssignment.get(next.getKey()); if (!task.partitions().equals(assignedPartitionsForTask)) { - log.debug("{} closing suspended non-assigned task", logPrefix); + log.debug("{} Closing suspended non-assigned active task {}", logPrefix, task.id); try { task.close(); task.closeStateManager(true); @@ -846,13 +829,13 @@ public class StreamThread extends Thread { while (standByTaskIterator.hasNext()) { final Map.Entry suspendedTask = standByTaskIterator.next(); if (!currentSuspendedTaskIds.contains(suspendedTask.getKey())) { - log.debug("{} Closing suspended non-assigned standby task {}", logPrefix, suspendedTask.getKey()); final StandbyTask task = suspendedTask.getValue(); + log.debug("{} Closing suspended non-assigned standby task {}", logPrefix, task.id); try { task.close(); task.closeStateManager(true); } catch (Exception e) { - log.error("{} Failed to remove suspended task standby {}", logPrefix, suspendedTask.getKey(), e); + log.error("{} Failed to remove suspended standby task {}", logPrefix, task.id, e); } finally { standByTaskIterator.remove(); } @@ -860,6 +843,20 @@ public class StreamThread extends Thread { } } + protected StreamTask createStreamTask(TaskId id, Collection partitions) { + log.debug("{} Creating new active task {} with assigned partitions {}", logPrefix, id, partitions); + + streamsMetrics.taskCreatedSensor.record(); + + final ProcessorTopology topology = builder.build(id.topicGroupId); + final RecordCollector recordCollector = new RecordCollectorImpl(producer, id.toString()); + try { + return new StreamTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory, cache, time, recordCollector); + } finally { + log.info("{} Created active task {} with assigned partitions {}", logPrefix, id, partitions); + } + } + private void addStreamTasks(Collection assignment, final long start) { if (partitionAssignor == null) throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen."); @@ -898,19 +895,27 @@ public class StreamThread extends Thread { // create all newly assigned tasks (guard against race condition with other thread via backoff and retry) // -> other thread will call removeSuspendedTasks(); eventually + log.debug("{} new active tasks to be created: {}", logPrefix, newTasks); + taskCreator.retryWithBackoff(newTasks, start); } - StandbyTask createStandbyTask(TaskId id, Collection partitions) { - log.info("{} Creating new standby task {} with assigned partitions {}", logPrefix, id, partitions); + private StandbyTask createStandbyTask(TaskId id, Collection partitions) { + log.debug("{} Creating new standby task {} with assigned partitions {}", logPrefix, id, partitions); streamsMetrics.taskCreatedSensor.record(); ProcessorTopology topology = builder.build(id.topicGroupId); if (!topology.stateStores().isEmpty()) { - return new StandbyTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory); + try { + return new StandbyTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory); + } finally { + log.info("{} Created standby task {} with assigned partitions {}", logPrefix, id, partitions); + } } else { + log.info("{} Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", logPrefix, id, partitions); + return null; } } @@ -942,6 +947,8 @@ public class StreamThread extends Thread { // create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry) // -> other thread will call removeSuspendedStandbyTasks(); eventually + log.debug("{} new standby tasks to be created: {}", logPrefix, newStandbyTasks); + new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks, start); restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet())); @@ -1006,7 +1013,7 @@ public class StreamThread extends Thread { return performOnAllTasks(new AbstractTaskAction() { @Override public void apply(final AbstractTask task) { - log.info("{} Closing a task {}", StreamThread.this.logPrefix, task.id()); + log.info("{} Closing task {}", StreamThread.this.logPrefix, task.id()); task.close(); streamsMetrics.tasksClosedSensor.record(); } @@ -1017,7 +1024,7 @@ public class StreamThread extends Thread { return performOnAllTasks(new AbstractTaskAction() { @Override public void apply(final AbstractTask task) { - log.info("{} Closing a task's topology {}", StreamThread.this.logPrefix, task.id()); + log.info("{} Closing task's topology {}", StreamThread.this.logPrefix, task.id()); task.closeTopology(); streamsMetrics.tasksClosedSensor.record(); } @@ -1168,7 +1175,6 @@ public class StreamThread extends Thread { class TaskCreator extends AbstractTaskCreator { void createTask(final TaskId taskId, final Set partitions) { - log.debug("{} creating new task {}", logPrefix, taskId); final StreamTask task = createStreamTask(taskId, partitions); activeTasks.put(taskId, task); @@ -1187,7 +1193,6 @@ public class StreamThread extends Thread { } void createTask(final TaskId taskId, final Set partitions) { - log.debug("{} creating new standby task {}", logPrefix, taskId); final StandbyTask task = createStandbyTask(taskId, partitions); updateStandByTaskMaps(checkpointedOffsets, taskId, partitions, task); } @@ -1204,10 +1209,19 @@ public class StreamThread extends Thread { @Override public void onPartitionsAssigned(Collection assignment) { + log.info("{} at state {}: new partitions {} assigned at the end of consumer rebalance.\n" + + "\tassigned active tasks: {}\n" + + "\tassigned standby tasks: {}\n" + + "\tcurrent suspended active tasks: {}\n" + + "\tcurrent suspended standby tasks: {}\n" + + "\tprevious active tasks: {}", + logPrefix, state, assignment, + partitionAssignor.activeTasks(), partitionAssignor.standbyTasks(), + suspendedTasks, suspendedStandbyTasks, prevActiveTasks); + final long start = time.milliseconds(); try { - log.info("{} at state {}: new partitions {} assigned at the end of consumer rebalance.", logPrefix, state, assignment); - storeChangelogReader = new StoreChangelogReader(restoreConsumer, time, requestTimeOut); + storeChangelogReader = new StoreChangelogReader(getName(), restoreConsumer, time, requestTimeOut); setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS); // do this first as we may have suspended standby tasks that // will become active or vice versa @@ -1223,14 +1237,24 @@ public class StreamThread extends Thread { rebalanceException = t; throw t; } finally { - log.debug("{} partition assignment took {} ms", logPrefix, time.milliseconds() - start); + log.info("{} partition assignment took {} ms.\n" + + "\tcurrent active tasks: {}\n" + + "\tcurrent standby tasks: {}", + logPrefix, time.milliseconds() - start, + activeTasks, standbyTasks); } } @Override public void onPartitionsRevoked(Collection assignment) { + log.info("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.\n" + + "\tcurrent assigned active tasks: {}\n" + + "\tcurrent assigned standby tasks: {}\n", + logPrefix, state, assignment, + activeTasks, standbyTasks); + + final long start = time.milliseconds(); try { - log.info("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.", logPrefix, state, assignment); setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED); lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned // suspend active tasks @@ -1242,6 +1266,13 @@ public class StreamThread extends Thread { streamsMetadataState.onChange(Collections.>emptyMap(), partitionAssignor.clusterMetadata()); removeStreamTasks(); removeStandbyTasks(); + + log.info("{} partition revocation took {} ms.\n" + + "\tsuspended active tasks: {}\n" + + "\tsuspended standby tasks: {}\n" + + "\tprevious active tasks: {}\n", + logPrefix, time.milliseconds() - start, + suspendedTasks, suspendedStandbyTasks, prevActiveTasks); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 5281814..45a6488 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -96,7 +96,7 @@ public class ThreadCache { } cache.flush(); - log.debug("Thread {} cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}", + log.trace("Thread {} cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}", name, puts(), gets(), evicts(), flushes()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index 97372b8..30582ed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -62,7 +62,7 @@ public class GlobalStreamThreadTest { new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time), new Metrics(), new MockTime(), - "client"); + "clientId"); } @Test @@ -93,7 +93,7 @@ public class GlobalStreamThreadTest { new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time), new Metrics(), new MockTime(), - "client"); + "clientId"); try { globalStreamThread.start(); http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java index ead19c7..a847a94 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java @@ -65,5 +65,20 @@ public class StateRestorerTest { assertThat(restorer.restoredOffset(), equalTo(OFFSET_LIMIT)); } + @Test + public void shouldSetStartingOffsetToMinOfLimitAndOffset() throws Exception { + restorer.setStartingOffset(20); + assertThat(restorer.startingOffset(), equalTo(20L)); + restorer.setRestoredOffset(100); + assertThat(restorer.restoredOffset(), equalTo(OFFSET_LIMIT)); + } + @Test + public void shouldReturnCorrectNumRestoredRecords() throws Exception { + restorer.setStartingOffset(20); + restorer.setRestoredOffset(40); + assertThat(restorer.restoredNumRecords(), equalTo(20L)); + restorer.setRestoredOffset(100); + assertThat(restorer.restoredNumRecords(), equalTo(OFFSET_LIMIT - 20)); + } } \ No newline at end of file