flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sara Arshad <sara.arshad...@gmail.com>
Subject Re: Question about My Flink Application
Date Tue, 26 May 2020 11:41:47 GMT
Hi Alexander,

Thank you for your reply.
I got a reply from AWS people. Seems like it's a configuration problem.
But, even if it works fine without restarting, it's not a good option for
us.
There is no one-to-one relation between cache data and keyed values.
Therefore, It has to send the whole data to every key every 5 minutes and
we may have a very large number of keys at the same time.
So I came up with a completely different solution. Now, I only have the
cache in a shared MAP. Maybe, It is not that much good design-wise but it
has higher performance.

Best regards,
Sara



On Sat, May 23, 2020 at 1:04 PM Alexander Fedulov <alexander@ververica.com>
wrote:

> Returning the discussion to the mailing list ( it accidentally went to a
> side channel because of a direct reply).
> What I was referring to, is the event-time processing semantic, which is
> based on the watermarks mechanism [1].
> If you are using it, the event time at your KeyedBroadcastProcessFuction
> will be determined as a minimum value of the maximum watermarks observed
> across all of the input channels. In order not to stall the processing of
> the events of the main data flow by the control channel (broadcast stream),
> you could set it's watermark to the maximum possible value, as shown in
> this example [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
> [2]
> https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/main/java/com/ververica/field/dynamicrules/sources/RulesSource.java#L80
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
>
>
> On Sat, May 23, 2020 at 1:05 AM Sara Arshad <sara.arshad.86@gmail.com>
> wrote:
>
>> - It was based on something I read about the broadcast.
>> Besides, as I mentioned before, the restart happens when it's triggering
>> checkpoints.
>> - When I send the streams it processes it perfectly fine between restarts.
>> - Yes, I am using ProcessingTimeService in the cache source to make it
>> get data every 300 seconds.
>> Do you have any views on should it be doable with a stream of a million
>> messages, In case I improve my implementation?
>>
>> Best regards,
>> Sara
>>
>> On Fri, May 22, 2020 at 6:22 PM Alexander Fedulov <
>> alexander@ververica.com> wrote:
>>
>>> OK, with such data sizes this should definitely be doable with a
>>> broadcast channel.
>>> "The problem was that the broadcast puts a lot of pressure on
>>> checkpointing." - is this the evaluation of the AWS support? Do you have
>>> any details as to why this is considered to be the case?
>>> "Even before I start to send the Kinesis stream it stuck." - so do you
>>> actually see any data output or nothing is happening and 20 minutes later
>>> the job crashes?
>>> Are you using event time processing semantics in your pipeline?
>>>
>>> --
>>>
>>> Alexander Fedulov | Solutions Architect
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> On Fri, May 22, 2020 at 4:34 PM Sara Arshad <sara.arshad.86@gmail.com>
>>> wrote:
>>>
>>>> Hi Alexander,
>>>>
>>>> It's not that much data. I have only 2 records in my dynamodb right now
>>>> (later it can be around 100 records. it's not that much) and I update
>>>> the whole data every 300 seconds.
>>>> Even before I start to send the Kinesis stream it stuck.
>>>> Yes, I can see the checkpoint size is around 150k. But in some cases
>>>> when I sent Kinesis Stream of 80 messages it's around 190k.
>>>> The maximum checkpoint duration is 670.
>>>>
>>>> Regards,
>>>>
>>>>
>>>> On Fri, 22 May 2020, 4:15 pm Alexander Fedulov, <
>>>> alexander@ververica.com> wrote:
>>>>
>>>>> Hi Sara,
>>>>>
>>>>> what is the volume of data that is coming in through the broadcast
>>>>> channel every 30 seconds? Do you only insert modified rules entries or
all
>>>>> of them on each update?
>>>>> Do you have access to metrics? Specifically, the size of the
>>>>> checkpoints and time distribution of different checkpoint phases are
of
>>>>> interest.
>>>>>
>>>>> Best,
>>>>>
>>>>> --
>>>>>
>>>>> Alexander Fedulov | Solutions Architect
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> On Fri, May 22, 2020 at 3:57 PM Sara Arshad <sara.arshad.86@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> The problem was that the broadcast puts a lot of pressure on
>>>>>> checkpointing.
>>>>>> I have to find another solution.
>>>>>> If you have any other solution please let me know.
>>>>>>
>>>>>> Regards,
>>>>>> Sara
>>>>>>
>>>>>> On Wed, 20 May 2020, 5:55 pm Sara Arshad, <sara.arshad.86@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> That was the broadcast stream. Which is supposed to behave like
a
>>>>>>> cache.
>>>>>>> Then I connect that one to the kinesis stream like the below
code.
>>>>>>> Also, I have two Sink functions to send the results to another
>>>>>>> dynamoDb table or cloud watch based on the output type.
>>>>>>> Is this make sense or do you have another idea?
>>>>>>>
>>>>>>> DataStreamSource<MyRule> ruleDataStreamSource = env.addSource(new
DynamoDBSource()).setParallelism(1);
>>>>>>>
>>>>>>> MapStateDescriptor<String, MyRule> ruleStateDescriptor
= new MapStateDescriptor<>(
>>>>>>>         "RulesBroadcastState",
>>>>>>>         BasicTypeInfo.STRING_TYPE_INFO,
>>>>>>>         TypeInformation.of(new TypeHint<MyRule>() {
>>>>>>>         }));
>>>>>>>
>>>>>>> BroadcastStream<MyRule> ruleBroadcastStream = ruleDataStreamSource
>>>>>>>         .broadcast(ruleStateDescriptor);
>>>>>>>
>>>>>>> SingleOutputStreamOperator<WindowFunctionOutput> process
= env.addSource(ObjectFactory.getKinesisConsumer())
>>>>>>>         .keyBy(new KeySelector<Message, Tuple2<Identifier,
Integer>>() {
>>>>>>>
>>>>>>>             @Override
>>>>>>>             public Tuple2<Identifier, Integer> getKey(Message
value) throws Exception {
>>>>>>>                 return Tuple2.of(value.getIdentifier(), value.getServiceType());
>>>>>>>             }
>>>>>>>
>>>>>>>         })
>>>>>>>         .connect(ruleBroadcastStream)
>>>>>>>         .process(new BroadcastWindowFunction());
>>>>>>>
>>>>>>> DataStreamSink<OutputX> blockingStrategyOutputDataStreamSink
= process
>>>>>>>         .filter(output -> OutputX.class.isAssignableFrom(output.getClass()))
>>>>>>>         .map(output -> (OutputX) output)
>>>>>>>         .addSink(new DynamoDBSink());
>>>>>>>
>>>>>>>
>>>>>>> DataStreamSink<OutputY> metricsOutputDataStreamSink = process
>>>>>>>         .filter(output -> OutputY.class.isAssignableFrom(output.getClass()))
>>>>>>>         .map(output -> (OutputY) output)
>>>>>>>         .addSink(new CloudWatchMetricsSink());
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 20, 2020 at 5:45 PM Alexander Fedulov <
>>>>>>> alexander@ververica.com> wrote:
>>>>>>>
>>>>>>>> I did not notice that you are actually running the KinesisAnalytics
>>>>>>>> job, without access to the machines, sorry. In this case,
without any
>>>>>>>> errors in the logs, I think there is not much that we can
do without the
>>>>>>>> AWS support team looking into it.
>>>>>>>> Nonetheless, one thing I was wondering about is whether you
>>>>>>>> necessarily need to have a custom DynamoDBSource to fetch
rules
>>>>>>>> periodically. How about directly connecting to the steam
of DynamoDB
>>>>>>>> updates and getting everything in the real time [1]
>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-4582>
? This would
>>>>>>>> remove one moving part that, as I see, you suspect to be
a potential source
>>>>>>>> of errors.
>>>>>>>>
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-4582
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Alexander Fedulov | Solutions Architect
>>>>>>>>
>>>>>>>>
>>>>>>>> <https://www.ververica.com/>
>>>>>>>>
>>>>>>>> Follow us @VervericaData
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The
Apache Flink
>>>>>>>> Conference
>>>>>>>>
>>>>>>>> On Wed, May 20, 2020 at 4:38 PM Sara Arshad <
>>>>>>>> sara.arshad.86@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I am using KinesisAnalytics and cloud watch dashboard
for logs.
>>>>>>>>> There is no error level log.
>>>>>>>>> It's doing stuff like 'Triggering checkpoint 187 @ 1589984558975
>>>>>>>>> for job 7998aca8913b33a090bc5c0f43168bd5.' then suddenly
it is
>>>>>>>>> restarting.
>>>>>>>>> I know these are very general but I really don't know
what's going
>>>>>>>>> on.
>>>>>>>>> I also asked AWS support. They haven't replied yet.
>>>>>>>>>
>>>>>>>>> This is my broadcast stream source:
>>>>>>>>>
>>>>>>>>> public class DynamoDBSource extends
>>>>>>>>> RichParallelSourceFunction<MyRule> implements CheckpointedFunction,
>>>>>>>>> ProcessingTimeCallback {
>>>>>>>>>
>>>>>>>>>     private static final long serialVersionUID = 1L;
>>>>>>>>>
>>>>>>>>>     private volatile boolean isRunning = true;
>>>>>>>>>
>>>>>>>>>     ListStateDescriptor<MyRule> ruleStateDescriptor
= new
>>>>>>>>> ListStateDescriptor<>(
>>>>>>>>>             "RulesBroadcastState",
>>>>>>>>>             TypeInformation.of(new TypeHint<MyRule>()
{
>>>>>>>>>             }));
>>>>>>>>>
>>>>>>>>>     private volatile Boolean sendData = true;
>>>>>>>>>     private transient ListState<MyRule> listState;
>>>>>>>>>
>>>>>>>>>     private transient ProcessingTimeServiceInf
>>>>>>>>> processingTimeService;
>>>>>>>>>
>>>>>>>>>     private static long rulesUpdateIntervalMillis;
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void open(Configuration parameters) throws
Exception {
>>>>>>>>>
>>>>>>>>>         //...
>>>>>>>>>
>>>>>>>>>         processingTimeService = new ProcessingTimeServiceImpl();
>>>>>>>>>
>>>>>>>>>         long currentProcessingTime =
>>>>>>>>> processingTimeService.getCurrentProcessingTime();
>>>>>>>>>
>>>>>>>>>         rulesUpdateIntervalMillis = Some static value
from config
>>>>>>>>> class
>>>>>>>>>
>>>>>>>>>         processingTimeService.registerTimer(currentProcessingTime,
>>>>>>>>> this);
>>>>>>>>>
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void snapshotState(FunctionSnapshotContext
context)
>>>>>>>>> throws Exception {
>>>>>>>>>
>>>>>>>>>         Preconditions.checkState(this.listState != null,
>>>>>>>>>                 "The " + getClass().getSimpleName() +
" has not
>>>>>>>>> been properly initialized.");
>>>>>>>>>
>>>>>>>>>         this.listState.clear();
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void initializeState(FunctionInitializationContext
>>>>>>>>> context) throws Exception {
>>>>>>>>>
>>>>>>>>>         Preconditions.checkState(this.listState == null,
>>>>>>>>>                 "The " + getClass().getSimpleName() +
" has
>>>>>>>>> already been initialized.");
>>>>>>>>>
>>>>>>>>>         this.listState =
>>>>>>>>> context.getOperatorStateStore().getListState(ruleStateDescriptor);
>>>>>>>>>
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void run(SourceContext<BlockingRule>
ctx) throws
>>>>>>>>> Exception {
>>>>>>>>>
>>>>>>>>>         while (isRunning) {
>>>>>>>>>
>>>>>>>>>             synchronized (sendData) {
>>>>>>>>>
>>>>>>>>>                 if (sendData) {
>>>>>>>>>
>>>>>>>>>                     for (MyRule rule : listState.get())
{
>>>>>>>>>
>>>>>>>>>                         ctx.collect(rule);
>>>>>>>>>                     }
>>>>>>>>>                     sendData = false;
>>>>>>>>>                 }
>>>>>>>>>             }
>>>>>>>>>         }
>>>>>>>>>
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void cancel() {
>>>>>>>>>         this.isRunning = false;
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void onProcessingTime(long timestamp) throws
Exception {
>>>>>>>>>
>>>>>>>>>         readRulesFromDB();
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> processingTimeService.registerTimer(processingTimeService.getCurrentProcessingTime()
>>>>>>>>> + rulesUpdateIntervalMillis, this);
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     private synchronized void readRulesFromDB() {
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>          ...
>>>>>>>>>         this.sendData = true;
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, May 20, 2020 at 4:10 PM Alexander Fedulov <
>>>>>>>>> alexander@ververica.com> wrote:
>>>>>>>>>
>>>>>>>>>> We'd need more details to localize the problem. What
are the last
>>>>>>>>>> things printed before the restart? Are there any
actual error-level logs
>>>>>>>>>> there? Do you happen to find any JVM crash files
>>>>>>>>>> (hs_err_pidXXXX.log) on your Flink machines?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Alexander Fedulov | Solutions Architect
>>>>>>>>>>
>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>
>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Join Flink Forward <https://flink-forward.org/>
- The Apache
>>>>>>>>>> Flink Conference
>>>>>>>>>>
>>>>>>>>>> On Wed, May 20, 2020 at 4:01 PM Sara Arshad <
>>>>>>>>>> sara.arshad.86@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you for your response.
>>>>>>>>>>> I get
>>>>>>>>>>> Error when creating PropertyDescriptor for public
final void
>>>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty...
>>>>>>>>>>> quite a lot. But it's Info and also I had it
before broadcast.
>>>>>>>>>>> I also retried an older version of my application
and it still
>>>>>>>>>>> works fine.
>>>>>>>>>>> By the way, the scenario works fine between restarts.
>>>>>>>>>>>
>>>>>>>>>>> Best regards,
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 20, 2020 at 3:44 PM Alexander Fedulov
<
>>>>>>>>>>> alexander@ververica.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Sara,
>>>>>>>>>>>>
>>>>>>>>>>>> do you have logs? Any exceptions in them?
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Alexander Fedulov | Solutions Architect
>>>>>>>>>>>>
>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>
>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/>
- The Apache
>>>>>>>>>>>> Flink Conference
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, May 19, 2020 at 11:28 PM Sara Arshad
<
>>>>>>>>>>>> sara.arshad.86@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have been using Flink with kinesis
analytics.
>>>>>>>>>>>>> I have a stream of data and also I need
a cache which I update
>>>>>>>>>>>>> every 300 seconds.
>>>>>>>>>>>>> To share the cache data with the kinesis
stream elements, I
>>>>>>>>>>>>> used a broadcast stream as I implemented
a SourceFunction which gets the
>>>>>>>>>>>>> data from DB and broadcast it to the
next stream which is
>>>>>>>>>>>>> KeyedBroadcastProcessFuction.
>>>>>>>>>>>>> But after adding the broadcast stream
(in the previous version
>>>>>>>>>>>>> I hadn't
>>>>>>>>>>>>> a cache and I was using KeyedProcessFuction
for kinesis
>>>>>>>>>>>>> stream), when I execute it in kinesis
analytics, it keeps restarting about
>>>>>>>>>>>>> every 20 minutes.
>>>>>>>>>>>>> Could you please help me that what could
be the issue?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>> Sara Arshad
>>>>>>>>>>>>>
>>>>>>>>>>>>
>
> On Wed, May 20, 2020 at 3:44 PM Alexander Fedulov <alexander@ververica.com>
> wrote:
>
>> Hi Sara,
>>
>> do you have logs? Any exceptions in them?
>>
>> Best,
>>
>> --
>>
>> Alexander Fedulov | Solutions Architect
>>
>> +49 1514 6265796
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>>
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Tony) Cheng
>>
>>
>>
>> On Tue, May 19, 2020 at 11:28 PM Sara Arshad <sara.arshad.86@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have been using Flink with kinesis analytics.
>>> I have a stream of data and also I need a cache which I update every 300
>>> seconds.
>>> To share the cache data with the kinesis stream elements, I used a
>>> broadcast stream as I implemented a SourceFunction which gets the data from
>>> DB and broadcast it to the next stream which is
>>> KeyedBroadcastProcessFuction.
>>> But after adding the broadcast stream (in the previous version I hadn't
>>> a cache and I was using KeyedProcessFuction for kinesis stream), when I
>>> execute it in kinesis analytics, it keeps restarting about every 20
>>> minutes.
>>> Could you please help me that what could be the issue?
>>>
>>> Best regards,
>>> Sara Arshad
>>>
>>

Mime
View raw message