spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sunita Arvind <sunitarv...@gmail.com>
Subject Re: Zero Data Loss in Spark with Kafka
Date Tue, 25 Oct 2016 20:52:55 GMT
Thanks for confirming Cody.
To get to use the library, I had to do:

val offsetsStore = new
ZooKeeperOffsetsStore(conf.getString("zkHosts"), "/consumers/topics/"+
topics + "/0")

It worked well. However, I had to specify the partitionId in the zkPath.
If I want the library to pick all the partitions for a topic, without me
specifying the path, is it possible out of the box or I need to tweak?

regards
Sunita


On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <cody@koeninger.org> wrote:

> You are correct that you shouldn't have to worry about broker id.
>
> I'm honestly not sure specifically what else you are asking at this point.
>
> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarvind@gmail.com>
> wrote:
> > Just re-read the kafka architecture. Something that slipped my mind is,
> it
> > is leader based. So topic/partitionId pair will be same on all the
> brokers.
> > So we do not need to consider brokerid while storing offsets. Still
> > exploring rest of the items.
> > regards
> > Sunita
> >
> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <sunitarvind@gmail.com>
> > wrote:
> >>
> >> Hello Experts,
> >>
> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments
> >> that it is old approach. Any reasons for that? Any issues observed with
> >> saving to ZK. The way we are planning to use it is:
> >> 1. Following
> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-
> achieving-zero-data-loss.html
> >> 2. Saving to the same file with offsetRange as a part of the file. We
> hope
> >> that there are no partial writes/ overwriting is possible and
> offsetRanges
> >>
> >> However I have below doubts which I couldnt figure out from the code
> here
> >> -
> >> https://github.com/ippontech/spark-kafka-source/blob/
> master/src/main/scala/com/ippontech/kafka/stores/
> ZooKeeperOffsetsStore.scala
> >> 1. The brokerId is not part of the OffsetRange. How will just the
> >> partitionId:FromOffset stay unique in a cluster with multiple brokers
> and
> >> multiple partitions/topic.
> >> 2. Do we have to specify zkPath to include the partitionid. I tried
> using
> >> the ZookeeperOffsetStore as is and it required me to specify the
> >> partitionId:
> >>
> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
> >> "/consumers/topics/"+ topics + "/0")
> >>
> >> For our usecases it is too limiting to include partitionId in the path.
> >> To get it to work by automatically detecting the existing partitions
> for a
> >> given topic, I changed it as below (inspired from
> >> http://www.programcreek.com/java-api-examples/index.php?
> api=kafka.utils.ZKGroupTopicDirs):
> >>
> >> /**
> >>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
> >>   * groupID consumer group to get offsets for
> >>   * topic topic to get offsets for
> >>   * return - mapping of (topic and) partition to offset
> >>   */
> >> private def getOffsets(groupID :String, topic: String):Option[String] =
> {
> >>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
> >>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
> >>   val topicSeq = List(topic).toSeq
> >>  // try {
> >>     val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
> >>     var partition:Object=null
> >>     for (partition <- partitions) {
> >>       val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
> "/" +
> >> partition;
> >>       val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(
> zkClient,
> >> partitionOffsetPath)._1;
> >>       val offset:Long = if(maybeOffset.isDefined) maybeOffset.get.toLong
> >> else 0L;
> >>       val topicAndPartition:TopicAndPartition  = new
> >> TopicAndPartition(topic, Integer.parseInt(partition.toString));
> >>       offsets.put(topicAndPartition, offset)
> >>     }
> >>   //}
> >> Option(offsets.mkString(","))
> >> }
> >>
> >> // Read the previously saved offsets from Zookeeper
> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
> >>
> >>   LogHandler.log.info("Reading offsets from ZooKeeper")
> >>
> >>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
> >>   val start = System.currentTimeMillis()
> >>   offsetsRangesStrOpt match {
> >>     case Some(offsetsRangesStr) =>
> >>       LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
> >>
> >>       val offsets = offsetsRangesStr.split(",")
> >>         .map(s => s.split(":"))
> >>         .map { case Array(partitionStr, offsetStr) =>
> >> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
> >>         .toMap
> >>
> >>       LogHandler.log.info("Done reading offsets from ZooKeeper. Took "
> +
> >> (System.currentTimeMillis() - start))
> >>
> >>       Some(offsets)
> >>     case None =>
> >>       LogHandler.log.info("No offsets found in ZooKeeper. Took " +
> >> (System.currentTimeMillis() - start))
> >>       None
> >>   }
> >>
> >> }
> >>
> >> However, I am concerned if the saveOffsets will work well with this
> >> approach. Thats when I realized we are not considering brokerIds which
> >> storing offsets and probably the OffsetRanges does not have it either.
> It
> >> can only provide Topic, partition, from and until offsets.
> >>
> >> I am probably missing something very basic. Probably the library works
> >> well by itself. Can someone/ Cody explain?
> >>
> >> Cody, Thanks a lot for sharing your work.
> >>
> >> regards
> >> Sunita
> >>
> >>
> >> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger <cody@koeninger.org>
> >> wrote:
> >>>
> >>> See
> >>> https://github.com/koeninger/kafka-exactly-once
> >>>
> >>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed"
> >>> <mdkhajaasmath@gmail.com> wrote:
> >>>>
> >>>> Hi Experts,
> >>>>
> >>>> I am looking for some information on how to acheive zero data loss
> while
> >>>> working with kafka and Spark. I have searched online and blogs have
> >>>> different answer. Please let me know if anyone has idea on this.
> >>>>
> >>>> Blog 1:
> >>>>
> >>>> https://databricks.com/blog/2015/01/15/improved-driver-
> fault-tolerance-and-zero-data-loss-in-spark-streaming.html
> >>>>
> >>>>
> >>>> Blog2:
> >>>>
> >>>> http://aseigneurin.github.io/2016/05/07/spark-kafka-
> achieving-zero-data-loss.html
> >>>>
> >>>>
> >>>> Blog one simply says configuration change with checkpoint directory
> and
> >>>> blog 2 give details about on how to save offsets to zoo keeper. can
> you
> >>>> please help me out with right approach.
> >>>>
> >>>> Thanks,
> >>>> Asmath
> >>>>
> >>>>
> >>
> >
>

Mime
View raw message