flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljoscha <...@git.apache.org>
Subject [GitHub] flink pull request #3347: [FLINK-5716] [streaming] Make StreamSourceContexts...
Date Wed, 22 Feb 2017 12:39:31 GMT
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3347#discussion_r102431062
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
---
    @@ -247,45 +282,220 @@ public void onProcessingTime(long timestamp) {
     	 * Streaming topologies can use timestamp assigner functions to override the timestamps
     	 * assigned here.
     	 */
    -	private static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T>
{
    +	private static class ManualWatermarkContext<T> extends WatermarkContext<T>
{
     
    -		private final Object lock;
     		private final Output<StreamRecord<T>> output;
     		private final StreamRecord<T> reuse;
     
    -		private ManualWatermarkContext(Object checkpointLock, Output<StreamRecord<T>>
output) {
    -			this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot
be null.");
    +		private ManualWatermarkContext(
    +				final Output<StreamRecord<T>> output,
    +				final ProcessingTimeService timeService,
    +				final Object checkpointLock,
    +				final StreamStatusMaintainer streamStatusMaintainer,
    +				final long idleTimeout) {
    +
    +			super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);
    +
     			this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
     			this.reuse = new StreamRecord<>(null);
     		}
     
     		@Override
    +		protected void processAndCollect(T element) {
    +			output.collect(reuse.replace(element));
    +		}
    +
    +		@Override
    +		protected void processAndCollectWithTimestamp(T element, long timestamp) {
    +			output.collect(reuse.replace(element, timestamp));
    +		}
    +
    +		@Override
    +		protected void processAndEmitWatermark(Watermark mark) {
    +			output.emitWatermark(mark);
    +		}
    +
    +		@Override
    +		protected boolean allowWatermark(Watermark mark) {
    +			return true;
    +		}
    +	}
    +
    +	/**
    +	 * An asbtract {@link SourceFunction.SourceContext} that should be used as the base
for
    --- End diff --
    
    Typo: `asbtract`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message