flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1967) Introduce (Event)time in Streaming
Date Mon, 13 Jul 2015 19:50:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14625219#comment-14625219
] 

ASF GitHub Bot commented on FLINK-1967:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/906#discussion_r34503537
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
---
    @@ -20,26 +20,35 @@
     
     import java.io.IOException;
     
    -import org.apache.flink.api.common.ExecutionConfig;
    -import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import com.google.common.base.Preconditions;
     import org.apache.flink.api.common.typeutils.TypeSerializer;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +
    +/**
    + * Serializer for {@link StreamRecord} and {@link Watermark}. This does not behave like
a normal
    + * {@link TypeSerializer}, instead, this is only used at the
    + * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} level for transmitting
    + * {@link StreamRecord StreamRecords} and {@link Watermark Watermarks}. This serializer
    + * can handle both of them, therefore it returns {@link Object} the result has
    + * to be cast to the correct type.
    + *
    + * @param <T> The type of value in the {@link StreamRecord}
    + */
    +public final class StreamRecordSerializer<T> extends TypeSerializer<Object>
{
     
    -public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>>
{
    +	private final long IS_WATERMARK = Long.MIN_VALUE;
    --- End diff --
    
    This is used as a special timestamp. When the deserializer deserializes it it knows that
the following long is a watermark. That's how the StreamRecordSerializer can be used for both
watermarks and records. I'll add a comment.


> Introduce (Event)time in Streaming
> ----------------------------------
>
>                 Key: FLINK-1967
>                 URL: https://issues.apache.org/jira/browse/FLINK-1967
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> This requires introducing a timestamp in streaming record and a change in the sources
to add timestamps to records. This will also introduce punctuations (or low watermarks) to
allow windows to work correctly on unordered, timestamped input data. In the process of this,
the windowing subsystem also needs to be adapted to use the punctuations. Furthermore, all
operators need to be made aware of punctuations and correctly forward them. Then, a new operator
must be introduced to to allow modification of timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message