Attached is the edited code. Am I heading in right direction? Also, I am missing something due to which, it seems to work well as long as the application is running and the files are created right. But as soon as I restart the application, it goes back to fromOffset as 0. Any thoughts?

regards
Sunita

On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind <sunitarvind@gmail.com> wrote:
Thanks for confirming Cody.
To get to use the library, I had to do:
val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), "/consumers/topics/"+ topics + "/0")
It worked well. However, I had to specify the partitionId in the zkPath.  If I want the library to pick all the partitions for a topic, without me specifying the path, is it possible out of the box or I need to tweak?

regards
Sunita


On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <cody@koeninger.org> wrote:
You are correct that you shouldn't have to worry about broker id.

I'm honestly not sure specifically what else you are asking at this point.

On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarvind@gmail.com> wrote:
> Just re-read the kafka architecture. Something that slipped my mind is, it
> is leader based. So topic/partitionId pair will be same on all the brokers.
> So we do not need to consider brokerid while storing offsets. Still
> exploring rest of the items.
> regards
> Sunita
>
> On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <sunitarvind@gmail.com>
> wrote:
>>
>> Hello Experts,
>>
>> I am trying to use the saving to ZK design. Just saw Sudhir's comments
>> that it is old approach. Any reasons for that? Any issues observed with
>> saving to ZK. The way we are planning to use it is:
>> 1. Following
>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html
>> 2. Saving to the same file with offsetRange as a part of the file. We hope
>> that there are no partial writes/ overwriting is possible and offsetRanges
>>
>> However I have below doubts which I couldnt figure out from the code here
>> -
>> https://github.com/ippontech/spark-kafka-source/blob/master/src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
>> 1. The brokerId is not part of the OffsetRange. How will just the
>> partitionId:FromOffset stay unique in a cluster with multiple brokers and
>> multiple partitions/topic.
>> 2. Do we have to specify zkPath to include the partitionid. I tried using
>> the ZookeeperOffsetStore as is and it required me to specify the
>> partitionId:
>>
>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
>> "/consumers/topics/"+ topics + "/0")
>>
>> For our usecases it is too limiting to include partitionId in the path.
>> To get it to work by automatically detecting the existing partitions for a
>> given topic, I changed it as below (inspired from
>> http://www.programcreek.com/java-api-examples/index.php?api=kafka.utils.ZKGroupTopicDirs):
>>
>> /**
>>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>>   * groupID consumer group to get offsets for
>>   * topic topic to get offsets for
>>   * return - mapping of (topic and) partition to offset
>>   */
>> private def getOffsets(groupID :String, topic: String):Option[String] = {
>>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>>   val topicSeq = List(topic).toSeq
>>  // try {
>>     val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
>>     var partition:Object=null
>>     for (partition <- partitions) {
>>       val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" +
>> partition;
>>       val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkClient,
>> partitionOffsetPath)._1;
>>       val offset:Long = if(maybeOffset.isDefined) maybeOffset.get.toLong
>> else 0L;
>>       val topicAndPartition:TopicAndPartition  = new
>> TopicAndPartition(topic, Integer.parseInt(partition.toString));
>>       offsets.put(topicAndPartition, offset)
>>     }
>>   //}
>> Option(offsets.mkString(","))
>> }
>>
>> // Read the previously saved offsets from Zookeeper
>> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
>>
>>   LogHandler.log.info("Reading offsets from ZooKeeper")
>>
>>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
>>   val start = System.currentTimeMillis()
>>   offsetsRangesStrOpt match {
>>     case Some(offsetsRangesStr) =>
>>       LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
>>
>>       val offsets = offsetsRangesStr.split(",")
>>         .map(s => s.split(":"))
>>         .map { case Array(partitionStr, offsetStr) =>
>> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
>>         .toMap
>>
>>       LogHandler.log.info("Done reading offsets from ZooKeeper. Took " +
>> (System.currentTimeMillis() - start))
>>
>>       Some(offsets)
>>     case None =>
>>       LogHandler.log.info("No offsets found in ZooKeeper. Took " +
>> (System.currentTimeMillis() - start))
>>       None
>>   }
>>
>> }
>>
>> However, I am concerned if the saveOffsets will work well with this
>> approach. Thats when I realized we are not considering brokerIds which
>> storing offsets and probably the OffsetRanges does not have it either. It
>> can only provide Topic, partition, from and until offsets.
>>
>> I am probably missing something very basic. Probably the library works
>> well by itself. Can someone/ Cody explain?
>>
>> Cody, Thanks a lot for sharing your work.
>>
>> regards
>> Sunita
>>
>>
>> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger <cody@koeninger.org>
>> wrote:
>>>
>>> See
>>> https://github.com/koeninger/kafka-exactly-once
>>>
>>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed"
>>> <mdkhajaasmath@gmail.com> wrote:
>>>>
>>>> Hi Experts,
>>>>
>>>> I am looking for some information on how to acheive zero data loss while
>>>> working with kafka and Spark. I have searched online and blogs have
>>>> different answer. Please let me know if anyone has idea on this.
>>>>
>>>> Blog 1:
>>>>
>>>> https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html
>>>>
>>>>
>>>> Blog2:
>>>>
>>>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html
>>>>
>>>>
>>>> Blog one simply says configuration change with checkpoint directory and
>>>> blog 2 give details about on how to save offsets to zoo keeper. can you
>>>> please help me out with right approach.
>>>>
>>>> Thanks,
>>>> Asmath
>>>>
>>>>
>>
>