apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rama...@apache.org
Subject [1/2] apex-malhar git commit: APEXMALHAR-2242 Added user documentation for 0.9 version of Kafka Input Operator
Date Thu, 13 Oct 2016 17:41:13 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 07812a903 -> 352e2d92c


APEXMALHAR-2242 Added user documentation for 0.9 version of Kafka Input Operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/bd502e7b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/bd502e7b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/bd502e7b

Branch: refs/heads/master
Commit: bd502e7b9f1df2c105d7f9a8044de439463aa299
Parents: 07812a9
Author: chaitanya <chaithu@apache.org>
Authored: Tue Sep 27 22:01:50 2016 +0530
Committer: Thomas Weise <thw@apache.org>
Committed: Thu Oct 13 08:37:30 2016 -0700

----------------------------------------------------------------------
 docs/operators/kafkaInputOperator.md | 146 +++++++++++++++++++++++++++++-
 1 file changed, 145 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bd502e7b/docs/operators/kafkaInputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/kafkaInputOperator.md b/docs/operators/kafkaInputOperator.md
index 1d2258e..cb29e5d 100644
--- a/docs/operators/kafkaInputOperator.md
+++ b/docs/operators/kafkaInputOperator.md
@@ -11,7 +11,9 @@ Kafka is a pull-based and distributed publish subscribe messaging system,
topics
 nodes. Kafka input operator is needed when you want to read data from multiple
 partitions of a Kafka topic in parallel in an Apex application.
 
-### AbstractKafkaInputOperator
+### 0.8 Version of Kafka Input Operator
+
+### AbstractKafkaInputOperator (Package: com.datatorrent.contrib.kafka)
 
 This is the abstract implementation that serves as base class for consuming messages from
Kafka messaging system. This class doesn’t have any ports.
 
@@ -280,3 +282,145 @@ Below is the configuration for “test” Kafka topic name and
 <value>localhost:2181</value>
 </property>
 ```
+
+
+### 0.9 Version of Kafka Input Operator
+
+### AbstractKafkaInputOperator (Package: org.apache.apex.malhar.kafka)
+
+This version uses the new 0.9 version of consumer API and features of this version are described
here. This operator is fault-tolerant, scalable, multi-cluster and multi-topic support.
+
+#### Pre-requisites
+
+This operator requires version 0.9.0 or later of the Kafka Consumer API.
+
+#### Ports
+----------
+
+This abstract class doesn't have any ports.
+
+#### Configuration properties
+----------------------------
+
+-   ***clusters*** - String[]
+    -   Mandatory Parameter.
+    -   Specifies the Kafka clusters that you want to consume messages from. To configure
multi-cluster support, you need to specify the clusters separated by ";".
+    
+-   ***topics*** - String[]
+    -   Mandatory Parameter.
+    -   Specified the Kafka topics that you want to consume messages from. If you want multi-topic
support, then specify the topics separated by ",".
+
+-   ***strategy*** - PartitionStrategy
+    -   Operator supports two types of partitioning strategies, ONE_TO_ONE and ONE_TO_MANY.
+    
+        ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator instance
per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator
instances.
+        ONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka input
operator instances where N is the number of Kafka topic partitions. If K is less than N, the
remaining topic partitions are assigned to the K operator instances in round-robin fashion.
If K is less than initialPartitionCount, the AppMaster creates one input operator instance
per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N)
= 2 then AppMaster creates 2 Kafka input operator instances.
+        Default Value = PartitionStrategy.ONE_TO_ONE.
+
+-   ***initialPartitionCount*** - Integer
+    -   When the ONE_TO_MANY partition strategy is enabled, this value indicates the number
of Kafka input operator instances. 
+        Default Value = 1.
+
+-   ***repartitionInterval*** - Long
+    -   Interval specified in milliseconds. This value specifies the minimum time required
between two repartition actions. 
+        Default Value = 30 Seconds.
+
+-   ***repartitionCheckInterval*** - Long
+    -   Interval specified in milliseconds. This value specifies the minimum interval between
two stat checks.
+        Default Value = 5 Seconds.
+
+-   ***maxTuplesPerWindow*** - Integer
+    -   Controls the maximum number of messages emitted in each streaming window from this
operator. Minimum value is 1. 
+        Default value = MAX_VALUE 
+
+-   ***initialOffset*** - InitialOffset
+    -   Indicates the type of offset i.e, “EARLIEST or LATEST or APPLICATION_OR_EARLIEST
or APPLICATION_OR_LATEST”. 
+        LATEST => Operator consume messages from latest point of Kafka queue. 
+        EARLIEST => Operator consume messages starting from message queue.
+        APPLICATION_OR_EARLIEST => Operator consume messages from committed position from
last run. If there is no committed offset, then it starts consuming from beginning of kafka
queue.
+        APPLICATION_OR_LATEST => Operator consume messages from committed position from
last run. If there is not committed offset, then it starts consuming from latest position
of queue.
+        Default value = InitialOffset.APPLICATION_OR_LATEST
+
+-   ***metricsRefreshInterval*** - Long
+    -   Interval specified in milliseconds. This value specifies the minimum interval between
two metric stat updates.
+        Default value = 5 Seconds.
+
+-   ***consumerTimeout*** - Long
+    -   Indicates the time waiting in poll data if data is not available. Please refer the
below link:
+        http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll
+        Default value = 5 Seconds.
+
+-   ***holdingBufferSize*** - Long
+    -   Indicates the maximum number of messages kept in memory for emitting.
+        Default value = 1024.
+
+-   ***consumerProps*** - Properties
+    -   Specify the consumer properties which are not yet set to the operator. Please refer
the below link for consumer properties:
+        http://kafka.apache.org/090/documentation.html#newconsumerconfigs
+
+-   ***windowDataManager*** - WindowDataManager
+    -   Specifies that the operator will process the same set of messages in a window before
and after a failure. This is important but it comes with higher cost because at the end of
each window the operator needs to persist some state with respect to that window.
+        Default value = WindowDataManager.NoopWindowDataManager.
+        
+#### Abstract Methods
+
+void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message): Abstract method
that emits tuples
+extracted from Kafka message.
+
+### Concrete Classes
+
+#### KafkaSinglePortInputOperator
+This class extends from AbstractKafkaInputOperator and define the getTuple() method which
extracts byte array from Kafka message.
+
+#### Ports
+outputPort <byte[]>: Tuples extracted from Kafka messages are emitted through this
port.
+
+### Application Example
+This section builds an Apex application using Kafka input operator.
+Below is the code snippet:
+
+```java
+@ApplicationAnnotation(name = "KafkaApp")
+public class ExampleKafkaApplication implements StreamingApplication
+{
+@Override
+public void populateDAG(DAG dag, Configuration entries)
+{
+  KafkaSinglePortInputOperator input =  dag.addOperator("MessageReader", new KafkaSinglePortInputOperator());
+
+  ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
+
+  dag.addStream("MessageData", input.outputPort, output.input);
+}
+}
+```
+Below is the configuration for “test” Kafka topic name and
+“localhost:9092” is the Broker:
+
+```xml
+<property>
+<name>dt.operator.MessageReader.prop.topics</name>
+<value>test</value>
+</property>
+
+<property>
+<name>dt.operator.KafkaInputOperator.prop.clusters</nam>
+<value>localhost:9092</value>
+</property>
+```
+
+By adding following lines to properties file, Kafka Input Operator supports multi-topic and
multi-cluster:
+ 
+```xml
+<property>
+<name>dt.operator.MessageReader.prop.topics</name>
+<value>test1, test2</value>
+</property>
+ 
+<property>
+<name>dt.operator.KafkaInputOperator.prop.clusters</nam>
+<value>localhost:9092; localhost:9093; localhost:9094</value>
+</property>
+```
+
+For more details about example application, Please refer to https://github.com/DataTorrent/examples/tree/master/tutorials/kafka.


Mime
View raw message