Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 933B41802E for ; Tue, 2 Feb 2016 18:52:54 +0000 (UTC) Received: (qmail 92093 invoked by uid 500); 2 Feb 2016 18:52:46 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 92064 invoked by uid 500); 2 Feb 2016 18:52:46 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 92055 invoked by uid 99); 2 Feb 2016 18:52:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Feb 2016 18:52:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4E247DFC90; Tue, 2 Feb 2016 18:52:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: some javadocs for kstream public api Date: Tue, 2 Feb 2016 18:52:46 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk 86a9036a7 -> 8189f9d58 MINOR: some javadocs for kstream public api guozhangwang Author: Yasuhiro Matsuda Reviewers: Guozhang Wang Closes #844 from ymatsuda/javadoc Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8189f9d5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8189f9d5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8189f9d5 Branch: refs/heads/trunk Commit: 8189f9d58002ec0092737741bf6c74eebab4dc73 Parents: 86a9036 Author: Yasuhiro Matsuda Authored: Tue Feb 2 10:52:43 2016 -0800 Committer: Guozhang Wang Committed: Tue Feb 2 10:52:43 2016 -0800 ---------------------------------------------------------------------- .../streams/processor/AbstractProcessor.java | 10 ++--- .../processor/DefaultPartitionGrouper.java | 6 +++ .../streams/processor/PartitionGrouper.java | 8 ++-- .../streams/processor/ProcessorContext.java | 41 ++++++++++++++++++++ .../streams/processor/TimestampExtractor.java | 2 +- 5 files changed, 58 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8189f9d5/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java index 01d0024..1932e5e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor; /** * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op * implementations of {@link #punctuate(long)} and {@link #close()}. - * + * * @param the type of keys * @param the type of values */ @@ -41,11 +41,11 @@ public abstract class AbstractProcessor implements Processor { *

* This method does nothing by default; if desired, subclasses should override it with custom functionality. *

- * - * @param streamTime the stream time when this method is being called + * + * @param timestamp the wallclock time when this method is being called */ @Override - public void punctuate(long streamTime) { + public void punctuate(long timestamp) { // do nothing } @@ -62,7 +62,7 @@ public abstract class AbstractProcessor implements Processor { /** * Get the processor's context set during {@link #init(ProcessorContext) initialization}. - * + * * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}. */ protected final ProcessorContext context() { http://git-wip-us.apache.org/repos/asf/kafka/blob/8189f9d5/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index 47c5e58..57df685 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -29,6 +29,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +/** + * DefaultPartitionGrouper groups partitions by the partition id. This behavior is assumed by the join processing in KStream. + * Join processing requires that topics are copartitoned, i.e., being partitioned by the same key and having the same + * number of partitions, are grouped together. Copartitioning is ensured by having the same number of partitions on + * joined topics, and by using the serialization and Producer's default partitioner. + */ public class DefaultPartitionGrouper implements PartitionGrouper { public Map> partitionGroups(Map> topicGroups, Cluster metadata) { http://git-wip-us.apache.org/repos/asf/kafka/blob/8189f9d5/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java index a40a1c4..f8311e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java @@ -26,11 +26,13 @@ import java.util.Set; public interface PartitionGrouper { /** - * Returns a map of task ids to groups of partitions. + * Returns a map of task ids to groups of partitions. A partition group forms a task, thus, partitions that are + * expected to be processed together must be in the same group. DefaultPartitionGrouper implements this + * interface. See {@link DefaultPartitionGrouper} for more information. * - * @param topicGroups The subscribed topic groups + * @param topicGroups The map from the {@link TopologyBuilder#topicGroups() topic group} id to topics * @param metadata Metadata of the consuming cluster * @return a map of task ids to groups of partitions */ Map> partitionGroups(Map> topicGroups, Cluster metadata); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/8189f9d5/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 41e2235..af98300 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -83,19 +83,60 @@ public interface ProcessorContext { StateStore getStateStore(String name); + /** + * Schedules a periodic operation for processors. A processor may call this method during + * {@link Processor#init(ProcessorContext) initialization} to + * schedule a periodic call called a punctuation to {@link Processor#punctuate(long)}. + * + * @param interval the time interval between punctuations + */ void schedule(long interval); + /** + * Forwards a key/value pair to the downstream processors + * @param key key + * @param value value + */ void forward(K key, V value); + /** + * Forwards a key/value pair to one of the downstream processors designated by childIndex + * @param key key + * @param value value + */ void forward(K key, V value, int childIndex); + /** + * Requests a commit + */ void commit(); + /** + * Returns the topic name of the current input record + * + * @return the topic name + */ String topic(); + /** + * Returns the partition id of the current input record + * + * @return the partition id + */ int partition(); + /** + * Returns the offset of the current input record + * + * @return the offset + */ long offset(); + /** + * Returns the timestamp of the current input record. The timestamp is extracted from + * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}. + * + * @return the timestamp + */ long timestamp(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8189f9d5/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index 62098f2..ce0ba70 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -20,7 +20,7 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.clients.consumer.ConsumerRecord; /** - * An interface that allows the KStream framework to extract a timestamp from a key-value pair + * An interface that allows the Kafka Streams framework to extract a timestamp from an instance of {@link ConsumerRecord} */ public interface TimestampExtractor {