flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8470) DelayTrigger and DelayAndCountTrigger in Flink Streaming Window API
Date Wed, 24 Jan 2018 10:29:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16337333#comment-16337333
] 

ASF GitHub Bot commented on FLINK-8470:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5342#discussion_r163500641
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
---
    @@ -0,0 +1,328 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.state.MapState;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ValueState;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
    +
    +// TODO: Make bucket granularity adaptable
    +
    +/**
    + * A TwoInputStreamOperator to execute time-bounded stream inner joins.
    + * <p>
    + * By using a configurable lower and upper bound this operator will emit exactly those
pairs
    + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower
and the
    + * upper bound can be configured to be either inclusive or exclusive.
    + *
    + * @param <T1> The type of the elements in the left stream
    + * @param <T2> The type of the elements in the right stream
    + */
    +public class TimeBoundedStreamJoinOperator<T1, T2>
    +	extends AbstractStreamOperator<Tuple2<T1, T2>>
    +	implements TwoInputStreamOperator<T1, T2, Tuple2<T1, T2>> {
    +
    +	private final long lowerBound;
    +	private final long upperBound;
    +
    +	private final long inverseLowerBound;
    +	private final long inverseUpperBound;
    +
    +	private final boolean lowerBoundInclusive;
    +	private final boolean upperBoundInclusive;
    +
    +	private final long bucketGranularity = 1;
    +
    +	private static final String LEFT_BUFFER = "LEFT_BUFFER";
    +	private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
    +	private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
    +	private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
    +
    +	private transient ValueState<Long> lastCleanupRightBuffer;
    +	private transient ValueState<Long> lastCleanupLeftBuffer;
    +
    +	private transient MapState<Long, List<Tuple3<T1, Long, Boolean>>>
leftBuffer;
    +	private transient MapState<Long, List<Tuple3<T2, Long, Boolean>>>
rightBuffer;
    +
    +	private transient TimestampedCollector<Tuple2<T1, T2>> collector;
    +
    +	/**
    +	 * Creates a new TimeBoundedStreamJoinOperator
    +	 *
    +	 * @param lowerBound          The lower bound for evaluating if elements should be joined
    +	 * @param upperBound          The upper bound for evaluating if elements should be joined
    +	 * @param lowerBoundInclusive Whether or not to include elements where the timestamp
matches
    +	 *                            the lower bound
    +	 * @param upperBoundInclusive Whether or not to include elements where the timestamp
matches
    +	 *                            the upper bound
    +	 */
    +	public TimeBoundedStreamJoinOperator(long lowerBound,
    +										 long upperBound,
    +										 boolean lowerBoundInclusive,
    +										 boolean upperBoundInclusive) {
    +
    +		this.lowerBound = lowerBound;
    +		this.upperBound = upperBound;
    +
    +		this.inverseLowerBound = -1 * upperBound;
    +		this.inverseUpperBound = -1 * lowerBound;
    +
    +		this.lowerBoundInclusive = lowerBoundInclusive;
    +		this.upperBoundInclusive = upperBoundInclusive;
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		super.open();
    +		collector = new TimestampedCollector<>(output);
    +
    +		this.leftBuffer = getRuntimeContext().getMapState(new MapStateDescriptor<>(
    +			LEFT_BUFFER,
    +			LONG_TYPE_INFO,
    +			TypeInformation.of(new TypeHint<List<Tuple3<T1, Long, Boolean>>>()
{
    +			})
    +		));
    +
    +		this.rightBuffer = getRuntimeContext().getMapState(new MapStateDescriptor<>(
    +			RIGHT_BUFFER,
    +			LONG_TYPE_INFO,
    +			TypeInformation.of(new TypeHint<List<Tuple3<T2, Long, Boolean>>>()
{
    +			})
    +		));
    +
    +		this.lastCleanupRightBuffer = getRuntimeContext().getState(new ValueStateDescriptor<>(
    +			LAST_CLEANUP_RIGHT,
    +			LONG_TYPE_INFO
    +		));
    +
    +		this.lastCleanupLeftBuffer = getRuntimeContext().getState(new ValueStateDescriptor<>(
    +			LAST_CLEANUP_LEFT,
    +			LONG_TYPE_INFO
    +		));
    +	}
    +
    +	/**
    +	 * Process a {@link StreamRecord<T1>} from the left stream. Whenever an {@link
StreamRecord<T1>}
    +	 * arrives at the left stream, it will get added to the left buffer. Possible join candidates
    +	 * for that element will be looked up from the right buffer and if the pair lies within
the
    +	 * user defined boundaries, it gets collected.
    +	 *
    +	 * @param record An incoming record to be joined
    +	 * @throws Exception Can throw an Exception during state access
    +	 */
    +	@Override
    +	public void processElement1(StreamRecord<T1> record) throws Exception {
    +
    +		long leftTs = record.getTimestamp();
    +		T1 leftValue = record.getValue();
    +
    +		addToLeftBuffer(leftValue, leftTs);
    +
    +		getJoinCandidatesForLeftElement(leftTs)
    --- End diff --
    
    This code creates unnecessary copies, as it first copies everything from the state to
a list and then iterates over the list checking which elements are able to be joined. Why
not keeping the for-loop and writing sth like:
    
    ```
    for (long i = lowerBound; i <= upperBound; i++) {
    			List<Tuple3<T2, Long, Boolean>> fromBucket = rightBuffer.get(leftTs + i);
    			if (fromBucket != null) {
    				for (Tuple3<T2, Long, Boolean> candidate : fromBucket) {
    					if (shouldBeJoined(leftTs, candidate.f1)) {
    						collect(leftValue, candidate.f0, leftTs);
    					}
    				}
    			}
    		}
    ```


> DelayTrigger and DelayAndCountTrigger in Flink Streaming Window API
> -------------------------------------------------------------------
>
>                 Key: FLINK-8470
>                 URL: https://issues.apache.org/jira/browse/FLINK-8470
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>    Affects Versions: 2.0.0
>            Reporter: Vijay Kansal
>            Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> In Flink streaming API, we do not have any in-built window trigger(s) available for the
below use cases:
>  1. DelayTrigger: Window function should trigger in case the 1st element belonging to
this window arrived more than maxDelay(ms) before the current processing time.
> 2. DelayAndCountTrigger: Window function should trigger in case the 1st element belonging
to this window arrived more than maxDelay(ms) before the current processing time or there
are more than maxCount elements in the window.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message