From issues-return-149999-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Jan 30 04:27:13 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id C9C12180654 for ; Tue, 30 Jan 2018 04:27:13 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B9C12160C31; Tue, 30 Jan 2018 03:27:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DAADF160C3F for ; Tue, 30 Jan 2018 04:27:12 +0100 (CET) Received: (qmail 82496 invoked by uid 500); 30 Jan 2018 03:27:12 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 82487 invoked by uid 99); 30 Jan 2018 03:27:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jan 2018 03:27:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1A7C9E04AA; Tue, 30 Jan 2018 03:27:11 +0000 (UTC) From: hequn8128 To: issues@flink.apache.org Reply-To: issues@flink.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join Content-Type: text/plain Message-Id: <20180130032711.1A7C9E04AA@git1-us-west.apache.org> Date: Tue, 30 Jan 2018 03:27:11 +0000 (UTC) Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164631861 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + *

By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + *

As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator + extends AbstractUdfStreamOperator> + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState>> leftBuffer; + private transient MapState>> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; + + /** + * Creates a new TimeBoundedStreamJoinOperator. + * + * @param lowerBound The lower bound for evaluating if elements should be joined + * @param upperBound The upper bound for evaluating if elements should be joined + * @param lowerBoundInclusive Whether or not to include elements where the timestamp matches + * the lower bound + * @param upperBoundInclusive Whether or not to include elements where the timestamp matches + * the upper bound + * @param udf A user-defined {@link JoinedProcessFunction} that gets called + * whenever two elements of T1 and T2 are joined + */ + public TimeBoundedStreamJoinOperator( + long lowerBound, + long upperBound, + boolean lowerBoundInclusive, + boolean upperBoundInclusive, + TypeSerializer leftTypeSerializer, + TypeSerializer rightTypeSerializer, + JoinedProcessFunction udf + ) { + + super(udf); + + this.lowerBound = lowerBound; + this.upperBound = upperBound; + + this.inverseLowerBound = -1 * upperBound; + this.inverseUpperBound = -1 * lowerBound; + + this.lowerBoundInclusive = lowerBoundInclusive; + this.upperBoundInclusive = upperBoundInclusive; + this.leftTypeSerializer = leftTypeSerializer; + this.rightTypeSerializer = rightTypeSerializer; + } + + @Override + public void open() throws Exception { + super.open(); + collector = new TimestampedCollector<>(output); + context = new ContextImpl(userFunction); + + Class> leftTypedTuple = + (Class>) (Class) Tuple3.class; + + TupleSerializer> leftTupleSerializer = new TupleSerializer<>( + leftTypedTuple, + new TypeSerializer[]{ + leftTypeSerializer, + LongSerializer.INSTANCE, + BooleanSerializer.INSTANCE + } + ); + + Class> rightTypedTuple = + (Class>) (Class) Tuple3.class; + + TupleSerializer> rightTupleSerializer = new TupleSerializer<>( + rightTypedTuple, + new TypeSerializer[]{ + rightTypeSerializer, + LongSerializer.INSTANCE, + BooleanSerializer.INSTANCE + } + ); + + this.leftBuffer = getRuntimeContext().getMapState(new MapStateDescriptor<>( + LEFT_BUFFER, + LongSerializer.INSTANCE, + new ListSerializer<>(leftTupleSerializer) + )); + + this.rightBuffer = getRuntimeContext().getMapState(new MapStateDescriptor<>( + RIGHT_BUFFER, + LongSerializer.INSTANCE, + new ListSerializer<>(rightTupleSerializer) + )); + + this.lastCleanupRightBuffer = getRuntimeContext().getState(new ValueStateDescriptor<>( + LAST_CLEANUP_RIGHT, + LONG_TYPE_INFO + )); + + this.lastCleanupLeftBuffer = getRuntimeContext().getState(new ValueStateDescriptor<>( + LAST_CLEANUP_LEFT, + LONG_TYPE_INFO + )); + } + + /** + * Process a {@link StreamRecord} from the left stream. Whenever an {@link StreamRecord} + * arrives at the left stream, it will get added to the left buffer. Possible join candidates + * for that element will be looked up from the right buffer and if the pair lies within the + * user defined boundaries, it gets collected. + * + * @param record An incoming record to be joined + * @throws Exception Can throw an Exception during state access + */ + @Override + public void processElement1(StreamRecord record) throws Exception { + + long leftTs = record.getTimestamp(); + T1 leftValue = record.getValue(); + + addToLeftBuffer(leftValue, leftTs); + + long min = leftTs + lowerBound; + long max = leftTs + upperBound; + + // TODO: Adapt to different bucket sizes here + // Go over all buckets that are within the time bounds + for (long i = min; i <= max; i++) { --- End diff -- I think iterate the mapState is better, data may be very sparse. What do you think? ---