flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From florianschmidt1994 <...@git.apache.org>
Subject [GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...
Date Tue, 24 Apr 2018 13:23:16 GMT
Github user florianschmidt1994 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5482#discussion_r183727245
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
---
    @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY> keySelector) 
{
     			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<?
super TaggedUnion<T1, T2>, W> assigner) {
     				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
assigner, null, null);
     			}
    +
    +			/**
    +			 * Specifies the time boundaries over which the join operation works, so that
    +			 * <pre>leftElement.timestamp + lowerBound <= rightElement.timestamp <=
leftElement.timestamp + upperBound</pre>
    +			 * By default both the lower and the upper bound are inclusive. This can be configured
    +			 * with {@link TimeBounded#lowerBoundExclusive(boolean)} and
    +			 * {@link TimeBounded#upperBoundExclusive(boolean)}
    +			 *
    +			 * @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound
    +			 * @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound
    +			 */
    +			public TimeBounded<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
    +
    +				TimeCharacteristic timeCharacteristic =
    +					input1.getExecutionEnvironment().getStreamTimeCharacteristic();
    +
    +				if (timeCharacteristic != TimeCharacteristic.EventTime) {
    +					throw new RuntimeException("Time-bounded stream joins are only supported in event
time");
    +				}
    +
    +				checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
    +				checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded
join");
    +				return new TimeBounded<>(
    +					input1,
    +					input2,
    +					lowerBound.toMilliseconds(),
    +					upperBound.toMilliseconds(),
    +					true,
    +					true,
    +					keySelector1,
    +					keySelector2
    +				);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Joined streams that have keys for both sides as well as the time boundaries over
which
    +	 * elements should be joined defined.
    +	 *
    +	 * @param <IN1> Input type of elements from the first stream
    +	 * @param <IN2> Input type of elements from the second stream
    +	 * @param <KEY> The type of the key
    +	 */
    +	public static class TimeBounded<IN1, IN2, KEY> {
    +
    +		private static final String TIMEBOUNDED_JOIN_FUNC_NAME = "TimeBoundedJoin";
    +
    +		private final DataStream<IN1> left;
    +		private final DataStream<IN2> right;
    +
    +		private final long lowerBound;
    +		private final long upperBound;
    +
    +		private final KeySelector<IN1, KEY> keySelector1;
    +		private final KeySelector<IN2, KEY> keySelector2;
    +
    +		private boolean lowerBoundInclusive;
    +		private boolean upperBoundInclusive;
    +
    +		public TimeBounded(
    +			DataStream<IN1> left,
    +			DataStream<IN2> right,
    +			long lowerBound,
    +			long upperBound,
    +			boolean lowerBoundInclusive,
    +			boolean upperBoundInclusive,
    +			KeySelector<IN1, KEY> keySelector1,
    +			KeySelector<IN2, KEY> keySelector2) {
    +
    +			this.left = Preconditions.checkNotNull(left);
    +			this.right = Preconditions.checkNotNull(right);
    +
    +			this.lowerBound = lowerBound;
    +			this.upperBound = upperBound;
    +
    +			this.lowerBoundInclusive = lowerBoundInclusive;
    +			this.upperBoundInclusive = upperBoundInclusive;
    +
    +			this.keySelector1 = Preconditions.checkNotNull(keySelector1);
    +			this.keySelector2 = Preconditions.checkNotNull(keySelector2);
    +		}
    +
    +		/**
    +		 * Configure whether the upper bound should be considered exclusive or inclusive.
    +		 */
    +		public TimeBounded<IN1, IN2, KEY> upperBoundExclusive(boolean exclusive) {
    +			this.upperBoundInclusive = !exclusive;
    +			return this;
    +		}
    +
    +		/**
    +		 * Configure whether the lower bound should be considered exclusive or inclusive.
    +		 */
    +		public TimeBounded<IN1, IN2, KEY> lowerBoundExclusive(boolean exclusive) {
    +			this.lowerBoundInclusive = !exclusive;
    +			return this;
    +		}
    +
    +		/**
    +		 * Completes the join operation with the user function that is executed for each joined
pair
    +		 * of elements.
    +		 * @param udf The user-defined function
    +		 * @param <OUT> The output type
    +		 * @return Returns a DataStream
    +		 */
    +		public <OUT> DataStream<OUT> process(TimeBoundedJoinFunction<IN1, IN2,
OUT> udf) {
    +
    +			ConnectedStreams<IN1, IN2> connected = left.connect(right);
    +
    +			udf = left.getExecutionEnvironment().clean(udf);
    +
    +			TypeInformation<OUT> resultType = TypeExtractor.getBinaryOperatorReturnType(
    +				udf,
    +				TimeBoundedJoinFunction.class,    // TimeBoundedJoinFunction<IN1, IN2, OUT>
    +				0,                                //						  0    1    2
    +				1,
    +				2,
    +				new int[]{0},                   // lambda input 1 type arg indices
    +				new int[]{1},                   // lambda input 1 type arg indices
    +				TypeExtractor.NO_INDEX,         // output arg indices
    +				left.getType(),                 // input 1 type information
    +				right.getType(),                // input 1 type information
    +				TIMEBOUNDED_JOIN_FUNC_NAME,
    +				false
    +			);
    +
    +			long bucketGranularity = Time.seconds(1).toMilliseconds();
    --- End diff --
    
    So I built a POC which supports setting it like this
    ```java
    streamOne
      .join(streamTwo)
      .where(new Tuple2KeyExtractor())
      .equalTo(new Tuple2KeyExtractor())
      .between(Time.milliseconds(0), Time.milliseconds(2))
      .withBucketGranularity(Time.hours(1)) // this one here
      .process(new CombineToStringJoinFunction())
      .addSink(new ResultSink());
    ```
    but I'm really not sure about this. To me it feels like the wrong level of abstraction
at that point. Maybe @kl0u or @aljoscha have some ideas on where this would fit best? Also
putting it in the `flink-config.yaml` feels a little bit too low level


---

Mime
View raw message