kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-5111: Code cleanup and improved log4j on StreamThread and Task
Date Fri, 28 Apr 2017 00:11:52 GMT
KAFKA-5111: Code cleanup and improved log4j on StreamThread and Task

 - mainly moving methods
 - also improved logging

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Eno Thereska, Guozhang Wang

Closes #2917 from mjsax/kafka-5111-code-cleanup-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6753af27
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6753af27
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6753af27

Branch: refs/heads/trunk
Commit: 6753af270c8a966411f5574afa57f6e5ffe43dbe
Parents: 36aef1c
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Thu Apr 27 17:11:46 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Apr 27 17:11:46 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AbstractTask.java       |  103 +-
 .../internals/ProcessorStateManager.java        |    1 +
 .../processor/internals/StandbyTask.java        |   63 +-
 .../streams/processor/internals/StreamTask.java |  276 ++---
 .../processor/internals/StreamThread.java       | 1101 +++++++++---------
 .../processor/internals/AbstractTaskTest.java   |    6 +-
 .../processor/internals/StreamTaskTest.java     |    2 +-
 .../processor/internals/StreamThreadTest.java   |   10 +-
 .../StreamThreadStateStoreProviderTest.java     |    2 +-
 9 files changed, 788 insertions(+), 776 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6753af27/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 9c58515..d546118 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -40,7 +40,7 @@ import java.util.Set;
 public abstract class AbstractTask {
     private static final Logger log = LoggerFactory.getLogger(AbstractTask.class);
 
-    protected final TaskId id;
+    private final TaskId id;
     protected final String applicationId;
     protected final ProcessorTopology topology;
     protected final Consumer consumer;
@@ -48,6 +48,8 @@ public abstract class AbstractTask {
     protected final Set<TopicPartition> partitions;
     InternalProcessorContext processorContext;
     protected final ThreadCache cache;
+    final String logPrefix;
+
     /**
      * @throws ProcessorStateException if the state manager cannot be created
      */
@@ -67,24 +69,20 @@ public abstract class AbstractTask {
         this.consumer = consumer;
         this.cache = cache;
 
+        logPrefix = String.format("%s [%s]", isStandby ? "standby-task" : "task", id());
+
         // create the processor state manager
         try {
             stateMgr = new ProcessorStateManager(id, partitions, isStandby, stateDirectory,
topology.storeToChangelogTopic(), changelogReader);
         } catch (final IOException e) {
-            throw new ProcessorStateException(String.format("task [%s] Error while creating
the state manager", id), e);
+            throw new ProcessorStateException(String.format("%s Error while creating the
state manager", logPrefix), e);
         }
     }
 
-    void initializeStateStores() {
-        // set initial offset limits
-        initializeOffsetLimits();
-
-        for (final StateStore store : topology.stateStores()) {
-            log.trace("task [{}] Initializing store {}", id(), store.name());
-            store.init(processorContext, store);
-        }
-
-    }
+    public abstract void resume();
+    public abstract void commit();
+    public abstract void suspend();
+    public abstract void close();
 
     public final TaskId id() {
         return id;
@@ -110,38 +108,6 @@ public abstract class AbstractTask {
         return cache;
     }
 
-    public abstract void resume();
-    public abstract void commit();
-    public abstract void suspend();
-    public abstract void close();
-
-    /**
-     * @throws ProcessorStateException if there is an error while closing the state manager
-     * @param writeCheckpoint boolean indicating if a checkpoint file should be written
-     */
-    void closeStateManager(final boolean writeCheckpoint) {
-        log.trace("task [{}] Closing", id);
-        stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
-    }
-
-    protected Map<TopicPartition, Long> recordCollectorOffsets() {
-        return Collections.emptyMap();
-    }
-
-    protected void initializeOffsetLimits() {
-        for (final TopicPartition partition : partitions) {
-            try {
-                final OffsetAndMetadata metadata = consumer.committed(partition); // TODO:
batch API?
-                stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() :
0L);
-            } catch (final AuthorizationException e) {
-                throw new ProcessorStateException(String.format("task [%s] AuthorizationException
when initializing offsets for %s", id, partition), e);
-            } catch (final WakeupException e) {
-                throw e;
-            } catch (final KafkaException e) {
-                throw new ProcessorStateException(String.format("task [%s] Failed to initialize
offsets for %s", id, partition), e);
-            }
-        }
-    }
 
     public StateStore getStore(final String name) {
         return stateMgr.getStore(name);
@@ -163,7 +129,11 @@ public abstract class AbstractTask {
      * @return A string representation of the StreamTask instance.
      */
     public String toString(final String indent) {
-        final StringBuilder sb = new StringBuilder(indent + "StreamsTask taskId: " + id +
"\n");
+        final StringBuilder sb = new StringBuilder();
+        sb.append(indent);
+        sb.append("StreamsTask taskId: ");
+        sb.append(id);
+        sb.append("\n");
 
         // print topology
         if (topology != null) {
@@ -182,10 +152,53 @@ public abstract class AbstractTask {
         return sb.toString();
     }
 
+    protected Map<TopicPartition, Long> recordCollectorOffsets() {
+        return Collections.emptyMap();
+    }
+
+    protected void updateOffsetLimits() {
+        log.debug("{} Updating store offset limits {}", logPrefix);
+        for (final TopicPartition partition : partitions) {
+            try {
+                final OffsetAndMetadata metadata = consumer.committed(partition); // TODO:
batch API?
+                stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() :
0L);
+            } catch (final AuthorizationException e) {
+                throw new ProcessorStateException(String.format("task [%s] AuthorizationException
when initializing offsets for %s", id, partition), e);
+            } catch (final WakeupException e) {
+                throw e;
+            } catch (final KafkaException e) {
+                throw new ProcessorStateException(String.format("task [%s] Failed to initialize
offsets for %s", id, partition), e);
+            }
+        }
+    }
+
     /**
      * Flush all state stores owned by this task
      */
     void flushState() {
         stateMgr.flush();
     }
+
+    void initializeStateStores() {
+        log.debug("{} Initializing state stores", logPrefix);
+
+        // set initial offset limits
+        updateOffsetLimits();
+
+        for (final StateStore store : topology.stateStores()) {
+            log.trace("task [{}] Initializing store {}", id(), store.name());
+            store.init(processorContext, store);
+        }
+    }
+
+    /**
+     * @throws ProcessorStateException if there is an error while closing the state manager
+     * @param writeCheckpoint boolean indicating if a checkpoint file should be written
+     */
+    void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException
{
+        log.trace("{} Closing state manager", logPrefix);
+        stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6753af27/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 5fbe819..1a0e34a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -227,6 +227,7 @@ public class ProcessorStateManager implements StateManager {
     }
 
     void putOffsetLimit(final TopicPartition partition, final long limit) {
+        log.trace("{} Updating store offset limit for partition {} to {}", logPrefix, partition,
limit);
         offsetLimits.put(partition, limit);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6753af27/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 367f035..b09c8cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -37,18 +37,18 @@ public class StandbyTask extends AbstractTask {
 
     private static final Logger log = LoggerFactory.getLogger(StandbyTask.class);
     private final Map<TopicPartition, Long> checkpointedOffsets;
-    private final String logPrefix;
 
     /**
      * Create {@link StandbyTask} with its assigned partitions
-     * @param id                    the ID of this task
-     * @param applicationId         the ID of the stream processing application
-     * @param partitions            the collection of assigned {@link TopicPartition}
-     * @param topology              the instance of {@link ProcessorTopology}
-     * @param consumer              the instance of {@link Consumer}
-     * @param config                the {@link StreamsConfig} specified by the user
-     * @param metrics               the {@link StreamsMetrics} created by the thread
-     * @param stateDirectory        the {@link StateDirectory} created by the thread
+     *
+     * @param id             the ID of this task
+     * @param applicationId  the ID of the stream processing application
+     * @param partitions     the collection of assigned {@link TopicPartition}
+     * @param topology       the instance of {@link ProcessorTopology}
+     * @param consumer       the instance of {@link Consumer}
+     * @param config         the {@link StreamsConfig} specified by the user
+     * @param metrics        the {@link StreamsMetrics} created by the thread
+     * @param stateDirectory the {@link StateDirectory} created by the thread
      */
     StandbyTask(final TaskId id,
                 final String applicationId,
@@ -64,29 +64,12 @@ public class StandbyTask extends AbstractTask {
         // initialize the topology with its own context
         processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
 
-        logPrefix = String.format("Standby Task [%s]", id());
-
-        log.info("{} Initializing", logPrefix);
+        log.debug("{} Initializing", logPrefix);
         initializeStateStores();
-
         processorContext.initialized();
-
         checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
     }
 
-    Map<TopicPartition, Long> checkpointedOffsets() {
-        return checkpointedOffsets;
-    }
-
-    /**
-     * Updates a state store using records from one change log partition
-     * @return a list of records not consumed
-     */
-    public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition,
final List<ConsumerRecord<byte[], byte[]>> records) {
-        log.debug("{} Updating standby replicas of its state store for partition [{}]", logPrefix,
partition);
-        return stateMgr.updateStandbyStates(partition, records);
-    }
-
     /**
      * <pre>
      * - update offset limits
@@ -94,8 +77,8 @@ public class StandbyTask extends AbstractTask {
      */
     @Override
     public void resume() {
-        log.info("{} " + "Resuming", logPrefix);
-        initializeOffsetLimits();
+        log.debug("{} " + "Resuming", logPrefix);
+        updateOffsetLimits();
     }
 
     /**
@@ -111,7 +94,7 @@ public class StandbyTask extends AbstractTask {
         stateMgr.flush();
         stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
         // reinitialize offset limits
-        initializeOffsetLimits();
+        updateOffsetLimits();
     }
 
     /**
@@ -122,14 +105,14 @@ public class StandbyTask extends AbstractTask {
      */
     @Override
     public void suspend() {
-        log.info("{} Suspending", logPrefix);
+        log.debug("{} Suspending", logPrefix);
         stateMgr.flush();
         stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
     }
 
     @Override
     public void close() {
-        log.info("{} Closing", logPrefix);
+        log.debug("{} Closing", logPrefix);
         boolean committedSuccessfully = false;
         try {
             commit();
@@ -139,4 +122,18 @@ public class StandbyTask extends AbstractTask {
         }
     }
 
-}
+    /**
+     * Updates a state store using records from one change log partition
+     *
+     * @return a list of records not consumed
+     */
+    public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition,
final List<ConsumerRecord<byte[], byte[]>> records) {
+        log.debug("{} Updating standby replicas of its state store for partition [{}]", logPrefix,
partition);
+        return stateMgr.updateStandbyStates(partition, records);
+    }
+
+    Map<TopicPartition, Long> checkpointedOffsets() {
+        return checkpointedOffsets;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6753af27/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 75bdbab..8b60b2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -51,7 +51,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC,
-1, -1L, null, null);
 
-    private final String logPrefix;
     private final PartitionGroup partitionGroup;
     private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
     private final PunctuationQueue punctuationQueue;
@@ -66,6 +65,22 @@ public class StreamTask extends AbstractTask implements Punctuator {
     private final Time time;
     private final TaskMetrics metrics;
 
+    protected class TaskMetrics  {
+        final StreamsMetricsImpl metrics;
+        final Sensor taskCommitTimeSensor;
+
+
+        TaskMetrics(final StreamsMetrics metrics) {
+            final String name = id().toString();
+            this.metrics = (StreamsMetricsImpl) metrics;
+            taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit",
Sensor.RecordingLevel.DEBUG, "streams-task-id", name);
+        }
+
+        void removeAllSensors() {
+            metrics.removeSensor(taskCommitTimeSensor);
+        }
+    }
+
     /**
      * Create {@link StreamTask} with its assigned partitions
      * @param id                    the ID of this task
@@ -105,12 +120,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
         for (final TopicPartition partition : partitions) {
             final SourceNode source = topology.source(partition.topic());
-            final RecordQueue queue = createRecordQueue(partition, source, timestampExtractor);
+            final RecordQueue queue = new RecordQueue(partition, source, timestampExtractor);
             partitionQueues.put(partition, queue);
         }
 
-        logPrefix = String.format("task [%s]", id);
-
         partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
 
         // initialize the consumed offset cache
@@ -122,7 +135,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
         // initialize the topology with its own context
         processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr,
metrics, cache);
         this.time = time;
-        log.info("{} Initialize task", logPrefix);
+        log.debug("{} Initializing", logPrefix);
         initializeStateStores();
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
         initTopology();
@@ -130,34 +143,12 @@ public class StreamTask extends AbstractTask implements Punctuator {
     }
 
     /**
-     * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the
record is skipped
-     * and not added to the queue for processing
-     *
-     * @param partition the partition
-     * @param records  the records
-     * @return the number of added records
-     */
-    @SuppressWarnings("unchecked")
-    public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[],
byte[]>> records) {
-        final int oldQueueSize = partitionGroup.numBuffered();
-        final int newQueueSize = partitionGroup.addRawRecords(partition, records);
-
-        log.trace("{} Added records into the buffered queue of partition {}, new queue size
is {}", logPrefix, partition, newQueueSize);
-
-        // if after adding these records, its partition queue's buffered size has been
-        // increased beyond the threshold, we can then pause the consumption for this partition
-        if (newQueueSize > maxBufferedSize) {
-            consumer.pause(singleton(partition));
-        }
-
-        return newQueueSize - oldQueueSize;
-    }
-
-    /**
-     * @return The number of records left in the buffer of this task's partition group
+     * re-initialize the task
      */
-    int numBuffered() {
-        return partitionGroup.numBuffered();
+    @Override
+    public void resume() {
+        log.debug("{} Resuming", logPrefix);
+        initTopology();
     }
 
     /**
@@ -181,8 +172,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
             final TopicPartition partition = recordInfo.partition();
 
             log.trace("{} Start processing one record [{}]", logPrefix, record);
-            final ProcessorRecordContext recordContext = createRecordContext(record);
-            updateProcessorContext(recordContext, currNode);
+            updateProcessorContext(record, currNode);
             currNode.process(record.key(), record.value());
 
             log.trace("{} Completed processing one record [{}]", logPrefix, record);
@@ -198,12 +188,12 @@ public class StreamTask extends AbstractTask implements Punctuator {
             }
         } catch (final KafkaException e) {
             throw new StreamsException(format("Exception caught in process. taskId=%s, processor=%s,
topic=%s, partition=%d, offset=%d",
-                                              id.toString(),
-                                              processorContext.currentNode().name(),
-                                              record.topic(),
-                                              record.partition(),
-                                              record.offset()
-                                              ), e);
+                id(),
+                processorContext.currentNode().name(),
+                record.topic(),
+                record.partition(),
+                record.offset()
+            ), e);
         } finally {
             processorContext.setCurrentNode(null);
         }
@@ -211,27 +201,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
         return true;
     }
 
-    private void updateProcessorContext(final ProcessorRecordContext recordContext, final
ProcessorNode currNode) {
-        processorContext.setRecordContext(recordContext);
-        processorContext.setCurrentNode(currNode);
-    }
-
-    /**
-     * Possibly trigger registered punctuation functions if
-     * current partition group timestamp has reached the defined stamp
-     */
-    boolean maybePunctuate() {
-        final long timestamp = partitionGroup.timestamp();
-
-        // if the timestamp is not known yet, meaning there is not enough data accumulated
-        // to reason stream partition time, then skip.
-        if (timestamp == TimestampTracker.NOT_KNOWN) {
-            return false;
-        } else {
-            return punctuationQueue.mayPunctuate(timestamp, this);
-        }
-    }
-
     /**
      * @throws IllegalStateException if the current node is not null
      */
@@ -241,20 +210,24 @@ public class StreamTask extends AbstractTask implements Punctuator {
             throw new IllegalStateException(String.format("%s Current node is not null",
logPrefix));
         }
 
-        final StampedRecord stampedRecord = new StampedRecord(DUMMY_RECORD, timestamp);
-        updateProcessorContext(createRecordContext(stampedRecord), node);
+        updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node);
 
         log.trace("{} Punctuating processor {} with timestamp {}", logPrefix, node.name(),
timestamp);
 
         try {
             node.punctuate(timestamp);
         } catch (final KafkaException e) {
-            throw new StreamsException(String.format("Exception caught in punctuate. taskId=%s
processor=%s", id,  node.name()), e);
+            throw new StreamsException(String.format("%s Exception caught while punctuating
processor '%s'", logPrefix,  node.name()), e);
         } finally {
             processorContext.setCurrentNode(null);
         }
     }
 
+    private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode)
{
+        processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(),
record.partition(), record.topic()));
+        processorContext.setCurrentNode(currNode);
+    }
+
     /**
      * <pre>
      *  - flush state and producer
@@ -278,33 +251,21 @@ public class StreamTask extends AbstractTask implements Punctuator {
             metrics.taskCommitTimeSensor);
     }
 
-    /**
-     * <pre>
-     *  - close topology
-     *  - {@link #commit()}
-     *    - flush state and producer
-     *    - write checkpoint
-     *    - commit offsets
-     * </pre>
-     */
     @Override
-    public void suspend() {
-        log.info("{} Suspending task", logPrefix);
-        closeTopology();
-        commit();
+    protected Map<TopicPartition, Long> recordCollectorOffsets() {
+        return recordCollector.offsets();
     }
 
-    /**
-     * re-initialize the task
-     */
-    public void resume() {
-        log.info("{} Resuming task", logPrefix);
-        initTopology();
+    @Override
+    protected void flushState() {
+        log.trace("{} Flushing state and producer", logPrefix);
+        super.flushState();
+        recordCollector.flush();
     }
 
     private void commitOffsets() {
-        log.trace("{} Committing offsets", logPrefix);
         if (commitOffsetNeeded) {
+            log.debug("{} Committing offsets", logPrefix);
             final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata
= new HashMap<>(consumedOffsets.size());
             for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet())
{
                 final TopicPartition partition = entry.getKey();
@@ -324,37 +285,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
         commitRequested = false;
     }
 
-    /**
-     * Whether or not a request has been made to commit the current state
-     */
-    boolean commitNeeded() {
-        return commitRequested;
-    }
-
-    /**
-     * Request committing the current task's state
-     */
-    void needCommit() {
-        commitRequested = true;
-    }
-
-    /**
-     * Schedules a punctuation for the processor
-     *
-     * @param interval  the interval in milliseconds
-     * @throws IllegalStateException if the current node is not null
-     */
-    public void schedule(final long interval) {
-        if (processorContext.currentNode() == null) {
-            throw new IllegalStateException(String.format("%s Current node is null", logPrefix));
-        }
-
-        punctuationQueue.schedule(new PunctuationSchedule(processorContext.currentNode(),
interval));
-    }
-
     private void initTopology() {
         // initialize the task by initializing all its processor nodes in the topology
-        log.info("{} Initializing processor nodes of the topology", logPrefix);
+        log.debug("{} Initializing processor nodes of the topology", logPrefix);
         for (final ProcessorNode node : topology.processors()) {
             processorContext.setCurrentNode(node);
             try {
@@ -365,6 +298,22 @@ public class StreamTask extends AbstractTask implements Punctuator {
         }
     }
 
+    /**
+     * <pre>
+     *  - close topology
+     *  - {@link #commit()}
+     *    - flush state and producer
+     *    - write checkpoint
+     *    - commit offsets
+     * </pre>
+     */
+    @Override
+    public void suspend() {
+        log.debug("{} Suspending", logPrefix);
+        closeTopology();
+        commit();
+    }
+
     private void closeTopology() {
         log.debug("{} Closing processor topology", logPrefix);
 
@@ -402,7 +351,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      */
     @Override
     public void close() {
-        log.info("{} Closing task", logPrefix);
+        log.debug("{} Closing", logPrefix);
         try {
             suspend();
             closeStateManager(true);
@@ -422,58 +371,87 @@ public class StreamTask extends AbstractTask implements Punctuator {
         }
     }
 
-    @Override
-    protected Map<TopicPartition, Long> recordCollectorOffsets() {
-        return recordCollector.offsets();
-    }
-
+    /**
+     * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the
record is skipped
+     * and not added to the queue for processing
+     *
+     * @param partition the partition
+     * @param records  the records
+     * @return the number of added records
+     */
     @SuppressWarnings("unchecked")
-    private RecordQueue createRecordQueue(final TopicPartition partition, final SourceNode
source, final TimestampExtractor timestampExtractor) {
-        return new RecordQueue(partition, source, timestampExtractor);
-    }
+    public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[],
byte[]>> records) {
+        final int oldQueueSize = partitionGroup.numBuffered();
+        final int newQueueSize = partitionGroup.addRawRecords(partition, records);
 
-    private ProcessorRecordContext createRecordContext(final StampedRecord currRecord) {
-        return new ProcessorRecordContext(currRecord.timestamp, currRecord.offset(), currRecord.partition(),
currRecord.topic());
-    }
+        log.trace("{} Added records into the buffered queue of partition {}, new queue size
is {}", logPrefix, partition, newQueueSize);
 
-    // Visible for testing
-    ProcessorContext processorContext() {
-        return processorContext;
+        // if after adding these records, its partition queue's buffered size has been
+        // increased beyond the threshold, we can then pause the consumption for this partition
+        if (newQueueSize > maxBufferedSize) {
+            consumer.pause(singleton(partition));
+        }
+
+        return newQueueSize - oldQueueSize;
     }
 
     /**
-     * Produces a string representation contain useful information about a StreamTask.
-     * This is useful in debugging scenarios.
-     * @return A string representation of the StreamTask instance.
+     * Schedules a punctuation for the processor
+     *
+     * @param interval  the interval in milliseconds
+     * @throws IllegalStateException if the current node is not null
      */
-    public String toString() {
-        return super.toString();
+    public void schedule(final long interval) {
+        if (processorContext.currentNode() == null) {
+            throw new IllegalStateException(String.format("%s Current node is null", logPrefix));
+        }
+
+        punctuationQueue.schedule(new PunctuationSchedule(processorContext.currentNode(),
interval));
     }
 
-    protected class TaskMetrics  {
-        final StreamsMetricsImpl metrics;
-        final Sensor taskCommitTimeSensor;
+    /**
+     * @return The number of records left in the buffer of this task's partition group
+     */
+    int numBuffered() {
+        return partitionGroup.numBuffered();
+    }
 
+    /**
+     * Possibly trigger registered punctuation functions if
+     * current partition group timestamp has reached the defined stamp
+     */
+    boolean maybePunctuate() {
+        final long timestamp = partitionGroup.timestamp();
 
-        public TaskMetrics(final StreamsMetrics metrics) {
-            final String name = id.toString();
-            this.metrics = (StreamsMetricsImpl) metrics;
-            taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit",
Sensor.RecordingLevel.DEBUG, "streams-task-id", name);
+        // if the timestamp is not known yet, meaning there is not enough data accumulated
+        // to reason stream partition time, then skip.
+        if (timestamp == TimestampTracker.NOT_KNOWN) {
+            return false;
+        } else {
+            return punctuationQueue.mayPunctuate(timestamp, this);
         }
+    }
 
-        void removeAllSensors() {
-            metrics.removeSensor(taskCommitTimeSensor);
-        }
+    /**
+     * Request committing the current task's state
+     */
+    void needCommit() {
+        commitRequested = true;
     }
 
-    @Override
-    void flushState() {
-        log.trace("{} Flushing state and producer topology", logPrefix);
-        super.flushState();
-        recordCollector.flush();
+    /**
+     * Whether or not a request has been made to commit the current state
+     */
+    boolean commitNeeded() {
+        return commitRequested;
+    }
+
+    // visible for testing only
+    ProcessorContext processorContext() {
+        return processorContext;
     }
 
-    // for testing only
+    // visible for testing only
     Producer<byte[], byte[]> producer() {
         return ((RecordCollectorImpl) recordCollector).producer();
     }


Mime
View raw message