flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sujit Sakre <sujit.sa...@northgateps.com>
Subject Re: Kafka data not read in FlinkKafkaConsumer09 second time from command line
Date Tue, 24 Jan 2017 17:53:27 GMT
Hi Aljoscha,

Thanks.

Yes, we are using Event Time.
Yes, Flink program is kept running in the IDE, i.e. eclipse and not closed,
after the first batch of events and when adding the second batch.
Yes, We do have acustom timestamp/watermark assigner, implemented as
BoundedOutOfOrdernessGenerator2


Are we using the properties for Kafka correctly?
We are using Flink 1.1.1 and Flink Kafka connector:
flink-connector-kafka-0.9_2.11

More about the behavior:
I have noticed that sometimes even after the first writing to the Kafka
queue,  and when the Flink program runs, sometimes it does process the
queue immediately. We need to restart. This is quite random.

Following is the rough outline of our code.

public class SlidingWindow2{

public static void main(String[] args) throws Exception {

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.get
ExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "demo");
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset" ,"earliest");

                FlinkKafkaConsumer09<Tuple5<String, String, Float, Float,
String>> consumer = new FlinkKafkaConsumer09<>(
"test",            // kafka topic name
new dataSchema(),
kafkaProps);
                DataStream<Tuple5<String, String, Float, Float, String>>
stream1 = env.addSource(consumer);
                DataStream<Tuple5<String, String, Float, Float, String>>
keyedStream = stream1.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessGenerator2());

                keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2)))
.apply(new CustomSlidingWindowFunction());

                env.execute("Sliding Event Time Window Processing");

           }
}


public static class CustomSlidingWindowFunction implements
WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String,
String, Float, Float, String>, Tuple, TimeWindow>{

@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String,
String, Float, Float, String>> input,
Collector<Tuple5<String, String, Float, Float, String>> out) throws
Exception {

....
        }


// Implemented custom Periodic Watermark as below from public static class
BoundedOutOfOrdernessGenerator2 implements
AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float,
String>> { /** * */ private static final long serialVersionUID = 1L;
private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in
seconds private long currentMaxTimestamp; @Override public long
extractTimestamp(Tuple5<String, String, Float, Float, String> element, long
previousElementTimestamp) { //System.out.println("inside
extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat =
new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); try { parseDate =
dateFormat.parse(element.f0); } catch (ParseException e) {
e.printStackTrace(); } long timestamp = parseDate.getTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return
timestamp; } @Override public Watermark getCurrentWatermark() { // return
the watermark as twice the current highest timestamp minus the
out-of-orderness bound // this is because it is not covering the lateness
sufficiently; now it does // in future this may be multiple of 3 or more if
necessary to cover the gap in records received return new
Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }





*Sujit Sakre*



On 24 January 2017 at 22:34, Aljoscha Krettek <aljoscha@apache.org> wrote:

> Hi,
> a bit more information would be useful. Are you using event-time? Is the
> Flink program kept running after adding the first batch of events and when
> adding the second batch or is it to invocations of your Flink program? Do
> you have a custom timestamp/watermark assigner?
>
> Cheers,
> Aljoscha
>
> On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <sujit.sakre@northgateps.com>
> wrote:
>
>> Hi,
>>
>> We are using a sliding window function to process data read from Kafka
>> Stream. We are using FlinkKafkaConsumer09 to read the data. The window
>> function and sink are running correctly.
>>
>> To test the program, we are generating a stream of data from command line.
>> This works when we add set of records once. When we add again, it does
>> not work, Flink produces no result, even though the records are added to
>> same Kafka topic from the same command line instance.
>>
>> Please could you suggest what could be wrong.
>>
>> Many thanks.
>>
>>
>> *Sujit Sakre*
>>
>> This email is sent on behalf of Northgate Public Services (UK) Limited
>> and its associated companies including Rave Technologies (India) Pvt
>> Limited (together "Northgate Public Services") and is strictly confidential
>> and intended solely for the addressee(s).
>> If you are not the intended recipient of this email you must: (i) not
>> disclose, copy or distribute its contents to any other person nor use its
>> contents in any way or you may be acting unlawfully;  (ii) contact
>> Northgate Public Services immediately on +44(0)1908 264500
>> <+44%201908%20264500> quoting the name of the sender and the addressee
>> then delete it from your system.
>> Northgate Public Services has taken reasonable precautions to ensure that
>> no viruses are contained in this email, but does not accept any
>> responsibility once this email has been transmitted.  You should scan
>> attachments (if any) for viruses.
>>
>> Northgate Public Services (UK) Limited, registered in England and Wales
>> under number 00968498 with a registered address of Peoplebuilding 2,
>> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
>> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
>> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
>> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>>
>

-- 
This email is sent on behalf of Northgate Public Services (UK) Limited and 
its associated companies including Rave Technologies (India) Pvt Limited 
(together "Northgate Public Services") and is strictly confidential and 
intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not 
disclose, copy or distribute its contents to any other person nor use its 
contents in any way or you may be acting unlawfully;  (ii) contact 
Northgate Public Services immediately on +44(0)1908 264500 quoting the name 
of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that 
no viruses are contained in this email, but does not accept any 
responsibility once this email has been transmitted.  You should scan 
attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales 
under number 00968498 with a registered address of Peoplebuilding 2, 
Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 
4NN.  Rave Technologies (India) Pvt Limited, registered in India under 
number 117068 with a registered address of 2nd Floor, Ballard House, Adi 
Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.

Mime
View raw message