flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljoscha <...@git.apache.org>
Subject [GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Date Wed, 19 Oct 2016 10:17:16 GMT
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84038706
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
---
    @@ -390,4 +425,141 @@ public void close() {
     			output.close();
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Watermark handling
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a {@link InternalTimerService} that can be used to query current processing
time
    +	 * and event time and to set timers. An operator can have several timer services, where
    +	 * each has its own namespace serializer. Timer services are differentiated by the string
    +	 * key that is given when requesting them, if you call this method with the same key
    +	 * multiple times you will get the same timer service instance in subsequent requests.
    +	 *
    +	 * <p>Timers are always scoped to a key, the currently active key of a keyed stream
operation.
    +	 * When a timer fires, this key will also be set as the currently active key.
    +	 *
    +	 * <p>Each timer has attached metadata, the namespace. Different timer services
    +	 * can have a different namespace type. If you don't need namespace differentiation
you
    +	 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
    +	 *
    +	 * @param name The name of the requested timer service. If no service exists under the
given
    +	 *             name a new one will be created and returned.
    +	 * @param keySerializer {@code TypeSerializer} for the keys of the timers.
    +	 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
    +	 * @param triggerable The {@link Triggerable} that should be invoked when timers fire
    +	 *
    +	 * @param <K> The type of the timer keys.
    +	 * @param <N> The type of the timer namespace.
    +	 */
    +	public <K, N> InternalTimerService<N> getInternalTimerService(
    +			String name,
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			Triggerable<K, N> triggerable) {
    +
    +		@SuppressWarnings("unchecked")
    +		HeapInternalTimerService<K, N> service = (HeapInternalTimerService<K, N>)
timerServices.get(name);
    +
    +		if (service == null) {
    +			if (restoredServices != null && restoredServices.containsKey(name)) {
    +				@SuppressWarnings("unchecked")
    +				HeapInternalTimerService.RestoredTimers<K, N> restoredService =
    +						(HeapInternalTimerService.RestoredTimers<K, N>) restoredServices.remove(name);
    +
    +				service = new HeapInternalTimerService<>(
    +						keySerializer,
    +						namespaceSerializer,
    +						triggerable,
    +						this,
    +						getRuntimeContext().getProcessingTimeService(),
    +						restoredService);
    +
    +			} else {
    +				service = new HeapInternalTimerService<>(
    +						keySerializer,
    +						namespaceSerializer,
    +						triggerable,
    +						this,
    +						getRuntimeContext().getProcessingTimeService());
    +			}
    +			timerServices.put(name, service);
    +		}
    +
    +		return service;
    +	}
    +
    +	public void processWatermark(Watermark mark) throws Exception {
    +		for (HeapInternalTimerService<?, ?> service : timerServices.values()) {
    +			service.advanceWatermark(mark.getTimestamp());
    +		}
    +		output.emitWatermark(mark);
    +	}
    +
    +	public void processWatermark1(Watermark mark) throws Exception {
    +		input1Watermark = mark.getTimestamp();
    +		long newMin = Math.min(input1Watermark, input2Watermark);
    +		if (newMin > combinedWatermark) {
    +			combinedWatermark = newMin;
    +			processWatermark(new Watermark(combinedWatermark));
    +		}
    +	}
    +
    +	public void processWatermark2(Watermark mark) throws Exception {
    --- End diff --
    
    Jip, I'm already trying to address that in another PR for changing the operator hierarchy.
😄 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message