flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Tumbling Windows with Processing Time
Date Wed, 03 Feb 2016 12:39:16 GMT
Do you have 7 distinct keys? You get as many result tuples as you have
keys, because the window is per key.

On Wed, Feb 3, 2016 at 12:12 PM, yutao sun <yutao.sun.fr@gmail.com> wrote:

> Thanks for your help,  I retest by disable the object reuse and got the
> same result (please see the picture attached).
>
>
> ​
> ​
>
> 2016-02-03 10:51 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>
>> The definition looks correct.
>> Because the windows are by-key, you should get one window result per key
>> per second.
>>
>> Can you turn off object-reuse? That is a pretty experimental thing and
>> works with the batch operations quite well, but not so much with the
>> streaming windows, yet.
>> I would only enable object reuse after the program works well and
>> correctly without.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <yutao.sun.fr@gmail.com> wrote:
>>
>>> Hi Flink users,
>>>
>>> I have a question about Tumbling Windows using Processing Time at Flink
>>> ver 0.10.1 :
>>>
>>> In fact, I want to measure the throughput of my application, the idea is
>>> at the last operator, by using a Tumbling processing Time windows with a
>>> size of 1 second, I count the message received.
>>>
>>> The problem is in case of 4 parallelisms, the number of windows should
>>> be 4/second, but I got 7 windows/second,  I wonder if is there any error
>>> the windows is defined?
>>>
>>> I copy my code here and thanks a lot for your help in advance.
>>> [KAFKA partition : 4]
>>>
>>>
>>> *val env = StreamExecutionEnvironment.getExecutionEnvironment*
>>>
>>>
>>> *val parallelism = 4*
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource(
 new FlinkKafkaConsumer082[String](    "test_topic",    new SimpleStringSchema,    properties
for connection KAFKA  )) .rebalance .map(do some thing) .map(payload => (payload, 1L))
.keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism) .timeWindow(Time.of(1,
TimeUnit.SECONDS)) .reduce((tuple0: (Payload, Long), tuple1: (Payload, Long)) => (tuple._0,
tuple._1 + 1L)) .addSink(   new FlinkKafkaProducer[(Payload, Long)](    KafkaBootstrapServers,
   TARGET_TOPIC,    new SerializationSchema[(Payload, Long), Array[Byte]] {      override
def serialize(element: (Payload, Long)): Array[Byte] = {        element._2.toString().getBytes
     }    }  ))env.execute("test")*
>>>
>>>
>>>
>>>
>>
>

Mime
View raw message