beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <linr...@itri.org.tw>
Subject RE: The problem of kafkaIO sdk for data latency
Date Mon, 05 Mar 2018 02:57:50 GMT
Hi Raghu,

I changed my beam version from 2.0.0 to 2.3.0, and then work well.

Thanks for your help.

Now, I have another question about data size in my window.

I am trying to control a fixed data size in my window (i.e., fixed 100 data size/window).

However, I sometimes see that the number of samples will be { 10, 40, 70, 100, 150,…}.

if anyone provide any idea  to me to set my window, I would appreciate it.

The setting for my window is as:

Window.<KV<String, String>> into(FixedWindows.of(Duration.standardSeconds(1)))
    .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterPane.elementCountAtLeast(100)))
     .withAllowedLateness(Duration.ZERO)
     .discardingFiredPanes())

My ideal window is:
|window time=1s||window time=1s||window time=1s||window time=1s|
| data size=100   || data size=100   || data size=100   || data size=100   |
First trigger firing:     [1, 2,…, 100]
Second trigger firing:                     [101, 102,…, 200]
Third trigger firing:                                                   [201, 202,…, 300]

Thanks

Rick


From: Raghu Angadi [mailto:rangadi@google.com]
Sent: Saturday, March 03, 2018 5:52 AM
To: user <user@beam.apache.org>
Cc: 林良憲 <linrick@itri.org.tw>
Subject: Re: The problem of kafkaIO sdk for data latency

I recently noticed that DirectRunner was leaking readers eventually crashing my pipeline.
It is fixed in master (PR 4658<https://github.com/apache/beam/pull/4658>, version 2.4.0-SNAPSHOT).
Can you try that? In my case the pipeline ran out of file descriptors.

Note that DirectRunner is not particularly optimized for runtime performance. It is often
used for initial testing. Since the performance is alright for you initially, trying it out
with master might help.

Note that TimestampedValue<> does not actually change the timestamp of the event. KafkaIO
uses processing time for event time by default. Please see JavaDoc for KafkaIO for more options.

On Wed, Feb 28, 2018 at 6:59 PM <linrick@itri.org.tw<mailto:linrick@itri.org.tw>>
wrote:
Dear all,

I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).

With using this sdk, there are a situation about data latency, and the description of situation
is in the following.

The data come from kafak with a fixed speed: 100 data size/ 1 sec.

I create a fixed window within 1 sec without delay. I found that the data size is 70, 80,
104, or greater than or equal to 104.

After one day, the data latency happens in my running time, and the data size will be only
10 in each window.

In order to clearly explain it, I also provide my code in the following.
" PipelineOptions readOptions = PipelineOptionsFactory.create();
final Pipeline p = Pipeline.create(readOptions);

PCollection<TimestampedValue<KV<String, String>>> readData =
  p.apply(KafkaIO.<String, String>read()
     .withBootstrapServers("127.0.0.1:9092<http://127.0.0.1:9092>")
     .withTopic("kafkasink")
     .withKeyDeserializer(StringDeserializer.class)
     .withValueDeserializer(StringDeserializer.class)
     .withoutMetadata())
     .apply(ParDo.of(new DoFn<KV<String, String>, TimestampedValue<KV<String,
String>>>() {
        @ProcessElement
        public void test(ProcessContext c) throws ParseException {
            String element = c.element().getValue();
            try {
              JsonNode arrNode = new ObjectMapper().readTree(element);
              String t = arrNode.path("v").findValue("Timestamp").textValue();
              DateTimeFormatter formatter = DateTimeFormatter.ofPattern("MM/dd/uuuu HH:mm:ss.SSSS");
             LocalDateTime dateTime = LocalDateTime.parse(t, formatter);
             java.time.Instant java_instant = dateTime.atZone(ZoneId.systemDefault()).toInstant();
             Instant timestamp  = new Instant(java_instant.toEpochMilli());
              c.output(TimestampedValue.of(c.element(), timestamp));
            } catch (JsonGenerationException e) {
                e.printStackTrace();
            } catch (JsonMappingException e) {
                e.printStackTrace();
          } catch (IOException e) {
                e.printStackTrace();
          }
        }}));

PCollection<TimestampedValue<KV<String, String>>> readDivideData = readData.apply(
      Window.<TimestampedValue<KV<String, String>>> into(FixedWindows.of(Duration.standardSeconds(1))
          .withOffset(Duration.ZERO))
          .triggering(AfterWatermark.pastEndOfWindow()
             .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
               .plusDelayOf(Duration.ZERO)))
          .withAllowedLateness(Duration.ZERO)
          .discardingFiredPanes());"

In addition, the running result is as shown in the following.
"data-size=104
coming-data-time=2018-02-27 02:00:49.117
window-time=2018-02-27 02:00:49.999

data-size=70
coming-data-time=2018-02-27 02:00:50.318
window-time=2018-02-27 02:00:50.999

data-size=104
coming-data-time=2018-02-27 02:00:51.102
window-time=2018-02-27 02:00:51.999

After one day:
data-size=10
coming-data-time=2018-02-28 02:05:48.217
window-time=2018-03-01 10:35:16.999 "

For repeating my situation, my running environment is:
OS: Ubuntn 14.04.3 LTS

JAVA: JDK 1.7

Beam 2.0.0 (with Direct runner)

Kafka 2.10-0.10.1.1

Maven 3.5.0, in which dependencies are listed in pom.xml:
<dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>2.0.0</version>
    </dependency>
<dependency>
   <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-direct-java</artifactId>
  <version>2.0.0</version>
  <scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
   <artifactId>beam-sdks-java-io-kafka</artifactId>
   <version>2.0.0</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>0.10.0.1</version>
</dependency>

If you have any idea about the problem (data latency), I am looking forward to hearing from
you.

Thanks

Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。
This email may contain confidential information. Please do not use or disclose it in any way
and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。
This email may contain confidential information. Please do not use or disclose it in any way
and delete it if you are not the intended recipient.
Mime
View raw message