spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neil Maheshwari <>
Subject Re: [Spark Streaming] Starting Spark Streaming application from a specific position in Kinesis stream
Date Sun, 19 Feb 2017 22:55:54 GMT
Thank you! I will look at the repository 

> On Feb 19, 2017, at 2:13 PM, Sam Elamin <> wrote:
> just doing a bit of research, seems weve been beaten to the punch, theres already a connector
you can use here
> Give it a go and feel free to give the commiter feedback or better yet send some PRs
if it needs them :) 
>> On Sun, Feb 19, 2017 at 9:23 PM, Sam Elamin <> wrote:
>> Hey Neil 
>> No worries! Happy to help you write it if you want, just link me to the repo and
we can write it together 
>> Would be fun!
>> Regards 
>> Sam 
>>> On Sun, 19 Feb 2017 at 21:21, Neil Maheshwari <>
>>> Thanks for the advice Sam. I will look into implementing a structured streaming
>>>> On Feb 19, 2017, at 11:54 AM, Sam Elamin <>
>>>> HI Niel,
>>>> My advice would be to write a structured streaming connector. The new structured
streaming APIs were brought in to handle exactly the issues you describe
>>>> See this blog
>>>> There isnt a structured streaming connector as of yet, but you can easily
write one that uses the underlying batch methods to read/write to Kinesis
>>>> Have a look at how I wrote my bigquery connector here. Plus the best thing
is we get a new connector to a highly used datasource/sink
>>>> Hope that helps
>>>> Regards
>>>> Sam
>>>> On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari <>
>>>> Thanks for your response Ayan. 
>>>> This could be an option. One complication I see with that approach is that
I do not want to miss any records that are between the data we have batched to the data store
and the checkpoint. I would still need a mechanism for recording the sequence number of the
last time the data was batched, so I could start the streaming application after that sequence
>>>> A similar approach could be to batch our data periodically, recording the
last sequence number of the batch. Then, fetch data from Kinesis using the low level API to
read data from the latest sequence number of the batched data up until the sequence number
of the latest checkpoint from our spark app. I could merge batched dataset and the dataset
fetched from Kinesis’s lower level API, and use that dataset as an RDD to prep the job.
>>>>> On Feb 19, 2017, at 3:12 AM, ayan guha <> wrote:
>>>>> Hi
>>>>> AFAIK, Kinesis does not provide any mechanism other than check point
to restart. That makes sense as it makes it so generic. 
>>>>> Question: why cant you warm up your data from a data store? Say every
30 mins you run a job to aggregate your data to a data store for that hour. When you restart
the streaming app it would read from dynamo check point, but it would also preps an initial
rdd from data store?
>>>>> Best
>>>>> Ayan
>>>>> On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari <>
>>>>> Hello, 
>>>>> I am building a Spark streaming application that ingests data from an
Amazon Kinesis stream. My application keeps track of the minimum price over a window for groups
of similar tickets. When I deploy the application, I would like it to start processing at
the start of the previous hours data. This will warm up the state of the application and allow
us to deploy our application faster. For example, if I start the application at 3 PM, I would
like to process the data retained by Kinesis from 2PM to 3PM, and then continue receiving
data going forward. Spark Streaming’s Kinesis receiver, which relies on the Amazon Kinesis
Client Library, seems to give me three options for choosing where to read from the stream:

>>>>> read from the latest checkpointed sequence number in Dynamo
>>>>> start from the oldest record in the stream (TRIM_HORIZON shard iterator
>>>>> start from the most recent record in the stream (LATEST shard iterator
>>>>> Do you have any suggestions on how we could start our application at
a specific timestamp or sequence number in the Kinesis stream? Some ideas I had were: 
>>>>> Create a KCL application that fetches the previous hour data and writes
it to HDFS. We can create an RDD from that dataset and initialize our Spark Streaming job
with it. The spark streaming job’s Kinesis receiver can have the same name as the initial
KCL application, and use that applications checkpoint as the starting point. We’re writing
our spark jobs in Python, so this would require launching the java MultiLang daemon, or writing
that portion of the application in Java/Scala. 
>>>>> Before the Spark streaming application starts, we could fetch a shard
iterator using the AT_TIMESTAMP shard iterator type. We could record the sequence number of
the first record returned by this iterator, and create an entry in Dynamo for our application
for that sequence number. Our Kinesis receiver would pick up from this checkpoint. It makes
me a little nervous that we would be faking Kinesis Client Library's protocol by writing a
checkpoint into Dynamo
>>>>> Thanks in advance!
>>>>> Neil
>>>>> -- 
>>>>> Best Regards,
>>>>> Ayan Guha

View raw message