spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shao, Saisai" <saisai.s...@intel.com>
Subject RE: kafka offset commit in spark streaming 1.2
Date Mon, 06 Jul 2015 13:00:46 GMT
Please see the inline comments.

From: Shushant Arora [mailto:shushantarora09@gmail.com]
Sent: Monday, July 6, 2015 8:51 PM
To: Shao, Saisai
Cc: user
Subject: Re: kafka offset commit in spark streaming 1.2

So If WAL is disabled, how developer can commit offset explicitly in spark streaming app since
we don't write code which will be executed in receiver ?

I think it is difficult for user to commit offset explicitly in receiver-based Spark Streaming
Kafka API.

If you want to explicitly commit offset, you could try Spark Streaming Kafka direct API, which
is newly added in Spark 1.3+, where you could manage the offsets yourself, it is implemented
based on Kafka’s low-level API.

Plus since offset commitment is asynchronoous, is it possible -it may happen last offset is
not commited yet and next stream batch started on receiver and it may get duplicate data ?

Yes, it is possible, so receiver based Spark Streaming Kafka API cannot guarantee no duplication
and no data lost. If you enable WAL, no data lost can be guaranteed by still you will meet
duplication. So the best way is to use Spark Streaming Kafka direct API with your own offset
management to make sure exact-once.



On Mon, Jul 6, 2015 at 6:16 PM, Shao, Saisai <saisai.shao@intel.com<mailto:saisai.shao@intel.com>>
wrote:
If you disable WAL, Spark Streaming itself will not manage any offset related things, is auto
commit is enabled by true, Kafka itself will update offsets in a time-based way, if auto commit
is disabled, no any part will call commitOffset, you need to call this API yourself.

Also Kafka’s offset commitment mechanism is actually a timer way, so it is asynchronized
with replication.

From: Shushant Arora [mailto:shushantarora09@gmail.com<mailto:shushantarora09@gmail.com>]
Sent: Monday, July 6, 2015 8:30 PM
To: Shao, Saisai
Cc: user
Subject: Re: kafka offset commit in spark streaming 1.2

And what if I disable WAL and use replication of receiver data using StorageLevel.MEMORY_ONLY2().
Will it commit offset after replicating the message or will it use autocommit.enable value.
And if it uses this value what if autocommit.enable is set to false then when does receiver
calls commitOffset?

On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai <saisai.shao@intel.com<mailto:saisai.shao@intel.com>>
wrote:
If you’re using WAL with Kafka, Spark Streaming will ignore this configuration(autocommit.enable)
and explicitly call commitOffset to update offset to Kafka AFTER WAL is done.

No matter what you’re setting with autocommit.enable, internally Spark Streaming will set
it to false to turn off autocommit mechanism.

Thanks
Jerry

From: Shushant Arora [mailto:shushantarora09@gmail.com<mailto:shushantarora09@gmail.com>]
Sent: Monday, July 6, 2015 8:11 PM
To: user
Subject: kafka offset commit in spark streaming 1.2

In spark streaming 1.2 , Is offset of kafka message consumed are updated in zookeeper only
after writing in WAL if WAL and checkpointig are enabled or is it depends upon kafkaparams
while initialing the kafkaDstream.


Map<String,String> kafkaParams = new HashMap<String, String>();
            kafkaParams.put("zookeeper.connect","ip:2181");
            kafkaParams.put("group.id<http://group.id>", "testgroup");
            kafkaParams.put("zookeeper.session.timeout.ms<http://zookeeper.session.timeout.ms>",
"10000");
            kafkaParams.put("autocommit.enable","true");
            kafkaParams.put("zookeeper.sync.time.ms<http://zookeeper.sync.time.ms>",
"250");

 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, byte[].class,kafka.serializer.DefaultDecoder.class
, kafka.serializer.DefaultDecoder.class,
                                                kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));


Here since I have set autocommit.enable to true , does spark streaming will ignore this and
always call explicit commitOffset high level  consumer connector or does it depends on parameter
passed?

Since if it depends upon parameter and receiver calls explicit commit only when autocommit
is false, then I should override the default autocommit to false from true while enabling
WAL, since it may give duplicate in case of failure if WAL is enabled and autocommit is true.


Mime
View raw message