ignite-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Roman Shtykh (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (IGNITE-2016) Update KafkaStreamer to fit new features introduced in Kafka 0.9
Date Fri, 29 Jan 2016 02:32:39 GMT

    [ https://issues.apache.org/jira/browse/IGNITE-2016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122791#comment-15122791
] 

Roman Shtykh edited comment on IGNITE-2016 at 1/29/16 2:32 AM:
---------------------------------------------------------------

Vishal,

Here is the snippet I prepared for Kafka Streamer part. If you have any suggestions, please
share with me ;)

Streaming data with Ignite Kafka Streamer Module
----------------------------------------------------------------

If you are using Maven to manage dependencies of your project, first of all you will have
to add Kafka Streamer module dependency like this (replace '${ignite.version}' with actual
Ignite version you are interested in):

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                        http://maven.apache.org/xsd/maven-4.0.0.xsd">
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-kafka</artifactId>
            <version>${ignite.version}</version>
        </dependency>
        ...
    </dependencies>
    ...
</project>

Having a cache with *String* keys and *String* values, the streamer can be started as follows

{noformat}
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {
    // allow overwriting cache data
    stmr.allowOverwrite(true);
    
    kafkaStreamer.setIgnite(ignite);
    kafkaStreamer.setStreamer(stmr);
    
    // set the topic
    kafkaStreamer.setTopic(someKafkaTopic);

    // set the number of threads to process Kafka streams
    kafkaStreamer.setThreads(4);
    
    // set Kafka consumer configurations
    kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);
    
    // set decoders
    kafkaStreamer.setKeyDecoder(strDecoder);
    kafkaStreamer.setValueDecoder(strDecoder);
    
    kafkaStreamer.start();
}
finally {
    kafkaStreamer.stop();
}
{noformat}

For the detailed information on Kafka consumer properties, refer http://kafka.apache.org/documentation.html


was (Author: roman_s):
Vishal,

Here is the snippet I prepared for Kafka Streamer part. If you have any suggestions, please
share with me ;)

Streaming data with Ignite Kafka Streamer Module
----------------------------------------------------------------

If you are using Maven to manage dependencies of your project, first of all you will have
to add Kafka Streamer module dependency like this (replace '${ignite.version}' with actual
Ignite version you are interested in):

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                        http://maven.apache.org/xsd/maven-4.0.0.xsd">
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-kafka</artifactId>
            <version>${ignite.version}</version>
        </dependency>
        ...
    </dependencies>
    ...
</project>

Having a cache with *String* keys and *String* values, the streamer can be started as follows
```
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {
    // allow overwriting cache data
    stmr.allowOverwrite(true);
    
    kafkaStreamer.setIgnite(ignite);
    kafkaStreamer.setStreamer(stmr);
    
    // set the topic
    kafkaStreamer.setTopic(someKafkaTopic);

    // set the number of threads to process Kafka streams
    kafkaStreamer.setThreads(4);
    
    // set Kafka consumer configurations
    kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);
    
    // set decoders
    kafkaStreamer.setKeyDecoder(strDecoder);
    kafkaStreamer.setValueDecoder(strDecoder);
    
    kafkaStreamer.start();
}
finally {
    kafkaStreamer.stop();
}
```
For the detailed information on Kafka consumer properties, refer http://kafka.apache.org/documentation.html

> Update KafkaStreamer to fit new features introduced in Kafka 0.9
> ----------------------------------------------------------------
>
>                 Key: IGNITE-2016
>                 URL: https://issues.apache.org/jira/browse/IGNITE-2016
>             Project: Ignite
>          Issue Type: New Feature
>          Components: streaming
>            Reporter: Roman Shtykh
>            Assignee: Roman Shtykh
>
> Particularly,
> - new consumer
> - Kafka Connect (Copycat)
> http://www.confluent.io/blog/apache-kafka-0.9-is-released
> This can be a a different integration task or a complete re-write of the current implementation,
considering the fact that Kafka Connect is a new standard way for "large-scale, real-time
data import and export for Kafka."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message