Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4E5631880F for ; Tue, 20 Oct 2015 16:42:33 +0000 (UTC) Received: (qmail 67471 invoked by uid 500); 20 Oct 2015 16:42:20 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 67424 invoked by uid 500); 20 Oct 2015 16:42:20 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 67360 invoked by uid 99); 20 Oct 2015 16:42:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Oct 2015 16:42:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6ADB4DFD86; Tue, 20 Oct 2015 16:42:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Tue, 20 Oct 2015 16:42:22 -0000 Message-Id: <4d6adc465aeb49449686d03d5449db9e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/4] flink git commit: [FLINK-2864] Make State of General-Purpose Window Operators Fault-Tolerant http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java index aecfd5d..7ab33cf 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java @@ -18,12 +18,15 @@ package org.apache.flink.streaming.runtime.operators.windowing; import com.google.common.annotations.VisibleForTesting; - +import org.apache.commons.lang.SerializationUtils; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -37,11 +40,16 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -65,26 +73,70 @@ public class NonKeyedWindowOperator private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(NonKeyedWindowOperator.class); + private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class); + // ------------------------------------------------------------------------ + // Configuration values and stuff from the user + // ------------------------------------------------------------------------ private final WindowAssigner windowAssigner; - private final Trigger triggerTemplate; + private final Trigger trigger; + private final WindowBufferFactory> windowBufferFactory; - protected transient Map, TriggerContext>> windows; + /** + * If this is true. The current processing time is set as the timestamp of incoming elements. + * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} + * if eviction should happen based on processing time. + */ + private boolean setProcessingTime = false; - private transient Map> processingTimeTimers; - private transient Map> watermarkTimers; + /** + * This is used to copy the incoming element because it can be put into several window + * buffers. + */ + private TypeSerializer inputSerializer; + /** + * For serializing the window in checkpoints. + */ + private final TypeSerializer windowSerializer; + + // ------------------------------------------------------------------------ + // State that is not checkpointed + // ------------------------------------------------------------------------ + + /** + * Processing time timers that are currently in-flight. + */ + private transient Map> processingTimeTimers; + + /** + * Current waiting watermark callbacks. + */ + private transient Map> watermarkTimers; + + /** + * This is given to the {@code WindowFunction} for emitting elements with a given timestamp. + */ protected transient TimestampedCollector timestampedCollector; - private boolean setProcessingTime = false; + // ------------------------------------------------------------------------ + // State that needs to be checkpointed + // ------------------------------------------------------------------------ - private TypeSerializer inputSerializer; + /** + * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer} + * and a {@code TriggerContext} that stores the {@code Trigger} for that pane. + */ + protected transient Map windows; + /** + * Creates a new {@code WindowOperator} based on the given policies and user functions. + */ public NonKeyedWindowOperator(WindowAssigner windowAssigner, + TypeSerializer windowSerializer, WindowBufferFactory> windowBufferFactory, AllWindowFunction windowFunction, Trigger trigger) { @@ -92,25 +144,23 @@ public class NonKeyedWindowOperator super(windowFunction); this.windowAssigner = requireNonNull(windowAssigner); + this.windowSerializer = windowSerializer; this.windowBufferFactory = requireNonNull(windowBufferFactory); - this.triggerTemplate = requireNonNull(trigger); + this.trigger = requireNonNull(trigger); setChainingStrategy(ChainingStrategy.ALWAYS); } @Override @SuppressWarnings("unchecked") - public void setInputType(TypeInformation type, ExecutionConfig executionConfig) { + public final void setInputType(TypeInformation type, ExecutionConfig executionConfig) { inputSerializer = (TypeSerializer) type.createSerializer(executionConfig); } @Override - public void open() throws Exception { + public final void open() throws Exception { super.open(); - windows = new HashMap<>(); - watermarkTimers = new HashMap<>(); - processingTimeTimers = new HashMap<>(); timestampedCollector = new TimestampedCollector<>(output); if (inputSerializer == null) { @@ -119,14 +169,47 @@ public class NonKeyedWindowOperator windowBufferFactory.setRuntimeContext(getRuntimeContext()); windowBufferFactory.open(getUserFunctionParameters()); + + // these could already be initialized from restoreState() + if (watermarkTimers == null) { + watermarkTimers = new HashMap<>(); + } + if (processingTimeTimers == null) { + processingTimeTimers = new HashMap<>(); + } + if (windows == null) { + windows = new HashMap<>(); + } + + // re-register timers that this window context had set + for (Context context: windows.values()) { + if (context.processingTimeTimer > 0) { + Set triggers = processingTimeTimers.get(context.processingTimeTimer); + if (triggers == null) { + getRuntimeContext().registerTimer(context.processingTimeTimer, NonKeyedWindowOperator.this); + triggers = new HashSet<>(); + processingTimeTimers.put(context.processingTimeTimer, triggers); + } + triggers.add(context); + } + if (context.watermarkTimer > 0) { + Set triggers = watermarkTimers.get(context.watermarkTimer); + if (triggers == null) { + triggers = new HashSet<>(); + watermarkTimers.put(context.watermarkTimer, triggers); + } + triggers.add(context); + } + + } } @Override - public void close() throws Exception { + public final void close() throws Exception { super.close(); // emit the elements that we still keep - for (W window: windows.keySet()) { - emitWindow(window, false); + for (Context window: windows.values()) { + emitWindow(window); } windows.clear(); windowBufferFactory.close(); @@ -134,58 +217,60 @@ public class NonKeyedWindowOperator @Override @SuppressWarnings("unchecked") - public void processElement(StreamRecord element) throws Exception { + public final void processElement(StreamRecord element) throws Exception { if (setProcessingTime) { element.replace(element.getValue(), System.currentTimeMillis()); } + Collection elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp()); for (W window: elementWindows) { - Tuple2, TriggerContext> bufferAndTrigger = windows.get(window); - if (bufferAndTrigger == null) { - bufferAndTrigger = new Tuple2<>(); - bufferAndTrigger.f0 = windowBufferFactory.create(); - bufferAndTrigger.f1 = new TriggerContext(window, triggerTemplate.duplicate()); - windows.put(window, bufferAndTrigger); + Context context = windows.get(window); + if (context == null) { + WindowBuffer windowBuffer = windowBufferFactory.create(); + context = new Context(window, windowBuffer); + windows.put(window, context); } StreamRecord elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp()); - bufferAndTrigger.f0.storeElement(elementCopy); - Trigger.TriggerResult triggerResult = bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, bufferAndTrigger.f1); + context.windowBuffer.storeElement(elementCopy); + Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context); processTriggerResult(triggerResult, window); } } - protected void emitWindow(W window, boolean purge) throws Exception { - timestampedCollector.setTimestamp(window.getEnd()); - - Tuple2, TriggerContext> bufferAndTrigger; - if (purge) { - bufferAndTrigger = windows.remove(window); - } else { - bufferAndTrigger = windows.get(window); - } - - if (bufferAndTrigger == null) { - LOG.debug("Window {} already gone.", window); - return; - } - + protected void emitWindow(Context context) throws Exception { + timestampedCollector.setTimestamp(context.window.maxTimestamp()); userFunction.apply( - window, - bufferAndTrigger.f0.getUnpackedElements(), + context.window, + context.windowBuffer.getUnpackedElements(), timestampedCollector); } private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws Exception { switch (triggerResult) { - case FIRE: - emitWindow(window, false); + case FIRE: { + Context context = windows.get(window); + if (context == null) { + LOG.debug("Window {} already gone.", window); + return; + } + + + emitWindow(context); break; + } - case FIRE_AND_PURGE: - emitWindow(window, true); + case FIRE_AND_PURGE: { + Context context = windows.remove(window); + if (context == null) { + LOG.debug("Window {} already gone.", window); + return; + } + + emitWindow(context); break; + } case CONTINUE: // ingore @@ -193,14 +278,14 @@ public class NonKeyedWindowOperator } @Override - public void processWatermark(Watermark mark) throws Exception { + public final void processWatermark(Watermark mark) throws Exception { Set toRemove = new HashSet<>(); - for (Map.Entry> triggers: watermarkTimers.entrySet()) { + for (Map.Entry> triggers: watermarkTimers.entrySet()) { if (triggers.getKey() <= mark.getTimestamp()) { - for (TriggerContext trigger: triggers.getValue()) { - Trigger.TriggerResult triggerResult = trigger.trigger.onTime(mark.getTimestamp(), trigger); - processTriggerResult(triggerResult, trigger.window); + for (Context context: triggers.getValue()) { + Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey()); + processTriggerResult(triggerResult, context.window); } toRemove.add(triggers.getKey()); } @@ -213,14 +298,14 @@ public class NonKeyedWindowOperator } @Override - public void trigger(long time) throws Exception { + public final void trigger(long time) throws Exception { Set toRemove = new HashSet<>(); - for (Map.Entry> triggers: processingTimeTimers.entrySet()) { + for (Map.Entry> triggers: processingTimeTimers.entrySet()) { if (triggers.getKey() < time) { - for (TriggerContext trigger: triggers.getValue()) { - Trigger.TriggerResult triggerResult = trigger.trigger.onTime(time, trigger); - processTriggerResult(triggerResult, trigger.window); + for (Context context: triggers.getValue()) { + Trigger.TriggerResult triggerResult = context.onProcessingTime(time); + processTriggerResult(triggerResult, context.window); } toRemove.add(triggers.getKey()); } @@ -231,35 +316,139 @@ public class NonKeyedWindowOperator } } - protected class TriggerContext implements Trigger.TriggerContext { - Trigger trigger; - W window; + /** + * A context object that is given to {@code Trigger} functions to allow them to register + * timer/watermark callbacks. + */ + protected class Context implements Trigger.TriggerContext { + protected W window; + + protected WindowBuffer windowBuffer; + + protected HashMap state; + + // use these to only allow one timer in flight at a time of each type + // if the trigger registers another timer this value here will be overwritten, + // the timer is not removed from the set of in-flight timers to improve performance. + // When a trigger fires it is just checked against the last timer that was set. + protected long watermarkTimer; + protected long processingTimeTimer; - public TriggerContext(W window, Trigger trigger) { + public Context( + W window, + WindowBuffer windowBuffer) { this.window = window; - this.trigger = trigger; + this.windowBuffer = windowBuffer; + state = new HashMap<>(); + + this.watermarkTimer = -1; + this.processingTimeTimer = -1; + } + + + @SuppressWarnings("unchecked") + protected Context(DataInputView in) throws Exception { + this.window = windowSerializer.deserialize(in); + this.watermarkTimer = in.readLong(); + this.processingTimeTimer = in.readLong(); + + int stateSize = in.readInt(); + byte[] stateData = new byte[stateSize]; + in.read(stateData); + ByteArrayInputStream bais = new ByteArrayInputStream(stateData); + state = (HashMap) SerializationUtils.deserialize(bais); + + this.windowBuffer = windowBufferFactory.create(); + int numElements = in.readInt(); + MultiplexingStreamRecordSerializer recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); + for (int i = 0; i < numElements; i++) { + windowBuffer.storeElement(recordSerializer.deserialize(in).asRecord()); + } + } + + protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException { + windowSerializer.serialize(window, out); + out.writeLong(watermarkTimer); + out.writeLong(processingTimeTimer); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + SerializationUtils.serialize(state, baos); + out.writeInt(baos.size()); + out.write(baos.toByteArray(), 0, baos.size()); + + MultiplexingStreamRecordSerializer recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); + out.writeInt(windowBuffer.size()); + for (StreamRecord element: windowBuffer.getElements()) { + recordSerializer.serialize(element, out); + } + } + + @SuppressWarnings("unchecked") + public OperatorState getKeyValueState(final String name, final S defaultState) { + return new OperatorState() { + @Override + public S value() throws IOException { + Serializable value = state.get(name); + if (value == null) { + state.put(name, defaultState); + value = defaultState; + } + return (S) value; + } + + @Override + public void update(S value) throws IOException { + state.put(name, value); + } + }; } @Override public void registerProcessingTimeTimer(long time) { - Set triggers = processingTimeTimers.get(time); + if (this.processingTimeTimer == time) { + // we already have set a trigger for that time + return; + } + Set triggers = processingTimeTimers.get(time); if (triggers == null) { getRuntimeContext().registerTimer(time, NonKeyedWindowOperator.this); triggers = new HashSet<>(); processingTimeTimers.put(time, triggers); } + this.processingTimeTimer = time; triggers.add(this); } @Override public void registerWatermarkTimer(long time) { - Set triggers = watermarkTimers.get(time); + if (watermarkTimer == time) { + // we already have set a trigger for that time + return; + } + Set triggers = watermarkTimers.get(time); if (triggers == null) { triggers = new HashSet<>(); watermarkTimers.put(time, triggers); } + this.watermarkTimer = time; triggers.add(this); } + + public Trigger.TriggerResult onProcessingTime(long time) throws Exception { + if (time == processingTimeTimer) { + return trigger.onTime(time, this); + } else { + return Trigger.TriggerResult.CONTINUE; + } + } + + public Trigger.TriggerResult onEventTime(long time) throws Exception { + if (time == watermarkTimer) { + return trigger.onTime(time, this); + } else { + return Trigger.TriggerResult.CONTINUE; + } + } } /** @@ -274,7 +463,7 @@ public class NonKeyedWindowOperator } @Override - public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { + public final void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { if (userFunction instanceof OutputTypeConfigurable) { @SuppressWarnings("unchecked") OutputTypeConfigurable typeConfigurable = (OutputTypeConfigurable) userFunction; @@ -283,12 +472,59 @@ public class NonKeyedWindowOperator } // ------------------------------------------------------------------------ + // Checkpointing + // ------------------------------------------------------------------------ + + @Override + public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { + StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + + // we write the panes with the key/value maps into the stream + StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); + + int numWindows = windows.size(); + out.writeInt(numWindows); + for (Context context: windows.values()) { + context.writeToState(out); + } + + taskState.setOperatorState(out.closeAndGetHandle()); + return taskState; + } + + @Override + public void restoreState(StreamTaskState taskState) throws Exception { + super.restoreState(taskState); + + + @SuppressWarnings("unchecked") + StateHandle inputState = (StateHandle) taskState.getOperatorState(); + DataInputView in = inputState.getState(getUserCodeClassloader()); + + int numWindows = in.readInt(); + this.windows = new HashMap<>(numWindows); + this.processingTimeTimers = new HashMap<>(); + this.watermarkTimers = new HashMap<>(); + + for (int j = 0; j < numWindows; j++) { + Context context = new Context(in); + windows.put(context.window, context); + } + } + + + // ------------------------------------------------------------------------ // Getters for testing // ------------------------------------------------------------------------ @VisibleForTesting - public Trigger getTriggerTemplate() { - return triggerTemplate; + public boolean isSetProcessingTime() { + return setProcessingTime; + } + + @VisibleForTesting + public Trigger getTrigger() { + return trigger; } @VisibleForTesting @@ -300,9 +536,4 @@ public class NonKeyedWindowOperator public WindowBufferFactory> getWindowBufferFactory() { return windowBufferFactory; } - - @VisibleForTesting - public boolean isSetProcessingTime() { - return setProcessingTime; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 82a3f9a..0b3274f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -18,13 +18,16 @@ package org.apache.flink.streaming.runtime.operators.windowing; import com.google.common.annotations.VisibleForTesting; - +import org.apache.commons.lang.SerializationUtils; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -38,10 +41,16 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -84,49 +93,77 @@ public class WindowOperator private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class); + // ------------------------------------------------------------------------ + // Configuration values and user functions + // ------------------------------------------------------------------------ + private final WindowAssigner windowAssigner; private final KeySelector keySelector; - private final Trigger triggerTemplate; + private final Trigger trigger; private final WindowBufferFactory> windowBufferFactory; /** - * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer} - * and a {@code TriggerContext} that stores the {@code Trigger} for that pane. + * If this is true. The current processing time is set as the timestamp of incoming elements. + * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} + * if eviction should happen based on processing time. + */ + private boolean setProcessingTime = false; + + /** + * This is used to copy the incoming element because it can be put into several window + * buffers. + */ + private TypeSerializer inputSerializer; + + /** + * For serializing the key in checkpoints. */ - protected transient Map, TriggerContext>>> windows; + private final TypeSerializer keySerializer; + + /** + * For serializing the window in checkpoints. + */ + private final TypeSerializer windowSerializer; + + // ------------------------------------------------------------------------ + // State that is not checkpointed + // ------------------------------------------------------------------------ /** * Processing time timers that are currently in-flight. */ - private transient Map> processingTimeTimers; + private transient Map> processingTimeTimers; /** * Current waiting watermark callbacks. */ - private transient Map> watermarkTimers; + private transient Map> watermarkTimers; /** * This is given to the {@code WindowFunction} for emitting elements with a given timestamp. */ protected transient TimestampedCollector timestampedCollector; + // ------------------------------------------------------------------------ + // State that needs to be checkpointed + // ------------------------------------------------------------------------ + /** - * If this is true. The current processing time is set as the timestamp of incoming elements. - * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} - * if eviction should happen based on processing time. + * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer} + * and a {@code TriggerContext} that stores the {@code Trigger} for that pane. */ - private boolean setProcessingTime = false; - - private TypeSerializer inputSerializer; + protected transient Map> windows; /** * Creates a new {@code WindowOperator} based on the given policies and user functions. */ public WindowOperator(WindowAssigner windowAssigner, + TypeSerializer windowSerializer, KeySelector keySelector, + TypeSerializer keySerializer, WindowBufferFactory> windowBufferFactory, WindowFunction windowFunction, Trigger trigger) { @@ -134,27 +171,26 @@ public class WindowOperator super(windowFunction); this.windowAssigner = requireNonNull(windowAssigner); + this.windowSerializer = windowSerializer; this.keySelector = requireNonNull(keySelector); + this.keySerializer = requireNonNull(keySerializer); this.windowBufferFactory = requireNonNull(windowBufferFactory); - this.triggerTemplate = requireNonNull(trigger); + this.trigger = requireNonNull(trigger); setChainingStrategy(ChainingStrategy.ALWAYS); -// forceInputCopy(); } @Override @SuppressWarnings("unchecked") - public void setInputType(TypeInformation type, ExecutionConfig executionConfig) { + public final void setInputType(TypeInformation type, ExecutionConfig executionConfig) { inputSerializer = (TypeSerializer) type.createSerializer(executionConfig); } @Override - public void open() throws Exception { + public final void open() throws Exception { super.open(); - windows = new HashMap<>(); - watermarkTimers = new HashMap<>(); - processingTimeTimers = new HashMap<>(); + timestampedCollector = new TimestampedCollector<>(output); if (inputSerializer == null) { @@ -163,17 +199,53 @@ public class WindowOperator windowBufferFactory.setRuntimeContext(getRuntimeContext()); windowBufferFactory.open(getUserFunctionParameters()); + + + // these could already be initialized from restoreState() + if (watermarkTimers == null) { + watermarkTimers = new HashMap<>(); + } + if (processingTimeTimers == null) { + processingTimeTimers = new HashMap<>(); + } + if (windows == null) { + windows = new HashMap<>(); + } + + // re-register timers that this window context had set + for (Map.Entry> entry: windows.entrySet()) { + Map keyWindows = entry.getValue(); + for (Context context: keyWindows.values()) { + if (context.processingTimeTimer > 0) { + Set triggers = processingTimeTimers.get(context.processingTimeTimer); + if (triggers == null) { + getRuntimeContext().registerTimer(context.processingTimeTimer, WindowOperator.this); + triggers = new HashSet<>(); + processingTimeTimers.put(context.processingTimeTimer, triggers); + } + triggers.add(context); + } + if (context.watermarkTimer > 0) { + Set triggers = watermarkTimers.get(context.watermarkTimer); + if (triggers == null) { + triggers = new HashSet<>(); + watermarkTimers.put(context.watermarkTimer, triggers); + } + triggers.add(context); + } + + } + } } @Override - public void close() throws Exception { + public final void close() throws Exception { super.close(); // emit the elements that we still keep - for (Map.Entry, TriggerContext>>> entry: windows.entrySet()) { - K key = entry.getKey(); - Map, TriggerContext>> keyWindows = entry.getValue(); - for (W window: keyWindows.keySet()) { - emitWindow(key, window, false); + for (Map.Entry> entry: windows.entrySet()) { + Map keyWindows = entry.getValue(); + for (Context window: keyWindows.values()) { + emitWindow(window); } } windows.clear(); @@ -182,77 +254,81 @@ public class WindowOperator @Override @SuppressWarnings("unchecked") - public void processElement(StreamRecord element) throws Exception { + public final void processElement(StreamRecord element) throws Exception { if (setProcessingTime) { element.replace(element.getValue(), System.currentTimeMillis()); } + Collection elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp()); K key = keySelector.getKey(element.getValue()); - Map, TriggerContext>> keyWindows = windows.get(key); + Map keyWindows = windows.get(key); if (keyWindows == null) { keyWindows = new HashMap<>(); windows.put(key, keyWindows); } for (W window: elementWindows) { - Tuple2, TriggerContext> bufferAndTrigger = keyWindows.get(window); - if (bufferAndTrigger == null) { - bufferAndTrigger = new Tuple2<>(); - bufferAndTrigger.f0 = windowBufferFactory.create(); - bufferAndTrigger.f1 = new TriggerContext(key, window, triggerTemplate.duplicate()); - keyWindows.put(window, bufferAndTrigger); + Context context = keyWindows.get(window); + if (context == null) { + WindowBuffer windowBuffer = windowBufferFactory.create(); + context = new Context(key, window, windowBuffer); + keyWindows.put(window, context); } StreamRecord elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp()); - bufferAndTrigger.f0.storeElement(elementCopy); - Trigger.TriggerResult triggerResult = bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, bufferAndTrigger.f1); + context.windowBuffer.storeElement(elementCopy); + Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context); processTriggerResult(triggerResult, key, window); } } - protected void emitWindow(K key, W window, boolean purge) throws Exception { - timestampedCollector.setTimestamp(window.getEnd()); - - Map, TriggerContext>> keyWindows = windows.get(key); - - if (keyWindows == null) { - LOG.debug("Window {} for key {} already gone.", window, key); - return; - } - - Tuple2, TriggerContext> bufferAndTrigger; - if (purge) { - bufferAndTrigger = keyWindows.remove(window); - } else { - bufferAndTrigger = keyWindows.get(window); - } - - if (bufferAndTrigger == null) { - LOG.debug("Window {} for key {} already gone.", window, key); - return; - } + protected void emitWindow(Context context) throws Exception { + timestampedCollector.setTimestamp(context.window.maxTimestamp()); - - userFunction.apply(key, - window, - bufferAndTrigger.f0.getUnpackedElements(), + userFunction.apply(context.key, + context.window, + context.windowBuffer.getUnpackedElements(), timestampedCollector); - - if (keyWindows.isEmpty()) { - windows.remove(key); - } } private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception { switch (triggerResult) { - case FIRE: - emitWindow(key, window, false); + case FIRE: { + Map keyWindows = windows.get(key); + if (keyWindows == null) { + LOG.debug("Window {} for key {} already gone.", window, key); + return; + } + Context context = keyWindows.get(window); + if (context == null) { + LOG.debug("Window {} for key {} already gone.", window, key); + return; + } + + + emitWindow(context); break; + } - case FIRE_AND_PURGE: - emitWindow(key, window, true); + case FIRE_AND_PURGE: { + Map keyWindows = windows.get(key); + if (keyWindows == null) { + LOG.debug("Window {} for key {} already gone.", window, key); + return; + } + Context context = keyWindows.remove(window); + if (context == null) { + LOG.debug("Window {} for key {} already gone.", window, key); + return; + } + if (keyWindows.isEmpty()) { + windows.remove(key); + } + + emitWindow(context); break; + } case CONTINUE: // ingore @@ -260,14 +336,14 @@ public class WindowOperator } @Override - public void processWatermark(Watermark mark) throws Exception { + public final void processWatermark(Watermark mark) throws Exception { Set toRemove = new HashSet<>(); - for (Map.Entry> triggers: watermarkTimers.entrySet()) { + for (Map.Entry> triggers: watermarkTimers.entrySet()) { if (triggers.getKey() <= mark.getTimestamp()) { - for (TriggerContext trigger: triggers.getValue()) { - Trigger.TriggerResult triggerResult = trigger.trigger.onTime(mark.getTimestamp(), trigger); - processTriggerResult(triggerResult, trigger.key, trigger.window); + for (Context context: triggers.getValue()) { + Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey()); + processTriggerResult(triggerResult, context.key, context.window); } toRemove.add(triggers.getKey()); } @@ -280,14 +356,14 @@ public class WindowOperator } @Override - public void trigger(long time) throws Exception { + public final void trigger(long time) throws Exception { Set toRemove = new HashSet<>(); - for (Map.Entry> triggers: processingTimeTimers.entrySet()) { + for (Map.Entry> triggers: processingTimeTimers.entrySet()) { if (triggers.getKey() < time) { - for (TriggerContext trigger: triggers.getValue()) { - Trigger.TriggerResult triggerResult = trigger.trigger.onTime(time, trigger); - processTriggerResult(triggerResult, trigger.key, trigger.window); + for (Context context: triggers.getValue()) { + Trigger.TriggerResult triggerResult = context.onProcessingTime(time); + processTriggerResult(triggerResult, context.key, context.window); } toRemove.add(triggers.getKey()); } @@ -302,37 +378,146 @@ public class WindowOperator * A context object that is given to {@code Trigger} functions to allow them to register * timer/watermark callbacks. */ - protected class TriggerContext implements Trigger.TriggerContext { - Trigger trigger; - K key; - W window; + protected class Context implements Trigger.TriggerContext { + protected K key; + protected W window; + + protected WindowBuffer windowBuffer; - public TriggerContext(K key, W window, Trigger trigger) { + protected HashMap state; + + // use these to only allow one timer in flight at a time of each type + // if the trigger registers another timer this value here will be overwritten, + // the timer is not removed from the set of in-flight timers to improve performance. + // When a trigger fires it is just checked against the last timer that was set. + protected long watermarkTimer; + protected long processingTimeTimer; + + public Context(K key, + W window, + WindowBuffer windowBuffer) { this.key = key; this.window = window; - this.trigger = trigger; + this.windowBuffer = windowBuffer; + state = new HashMap<>(); + + this.watermarkTimer = -1; + this.processingTimeTimer = -1; + } + + /** + * Constructs a new {@code Context} by reading from a {@link DataInputView} that + * contains a serialized context that we wrote in + * {@link #writeToState(StateBackend.CheckpointStateOutputView)} + */ + @SuppressWarnings("unchecked") + protected Context(DataInputView in) throws Exception { + this.key = keySerializer.deserialize(in); + this.window = windowSerializer.deserialize(in); + this.watermarkTimer = in.readLong(); + this.processingTimeTimer = in.readLong(); + + int stateSize = in.readInt(); + byte[] stateData = new byte[stateSize]; + in.read(stateData); + ByteArrayInputStream bais = new ByteArrayInputStream(stateData); + state = (HashMap) SerializationUtils.deserialize(bais); + + this.windowBuffer = windowBufferFactory.create(); + int numElements = in.readInt(); + MultiplexingStreamRecordSerializer recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); + for (int i = 0; i < numElements; i++) { + windowBuffer.storeElement(recordSerializer.deserialize(in).asRecord()); + } + } + + /** + * Writes the {@code Context} to the given state checkpoint output. + */ + protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException { + keySerializer.serialize(key, out); + windowSerializer.serialize(window, out); + out.writeLong(watermarkTimer); + out.writeLong(processingTimeTimer); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + SerializationUtils.serialize(state, baos); + out.writeInt(baos.size()); + out.write(baos.toByteArray(), 0, baos.size()); + + MultiplexingStreamRecordSerializer recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); + out.writeInt(windowBuffer.size()); + for (StreamRecord element: windowBuffer.getElements()) { + recordSerializer.serialize(element, out); + } + } + + @SuppressWarnings("unchecked") + public OperatorState getKeyValueState(final String name, final S defaultState) { + return new OperatorState() { + @Override + public S value() throws IOException { + Serializable value = state.get(name); + if (value == null) { + state.put(name, defaultState); + value = defaultState; + } + return (S) value; + } + + @Override + public void update(S value) throws IOException { + state.put(name, value); + } + }; } @Override public void registerProcessingTimeTimer(long time) { - Set triggers = processingTimeTimers.get(time); + if (this.processingTimeTimer == time) { + // we already have set a trigger for that time + return; + } + Set triggers = processingTimeTimers.get(time); if (triggers == null) { getRuntimeContext().registerTimer(time, WindowOperator.this); triggers = new HashSet<>(); processingTimeTimers.put(time, triggers); } + this.processingTimeTimer = time; triggers.add(this); } @Override public void registerWatermarkTimer(long time) { - Set triggers = watermarkTimers.get(time); + if (watermarkTimer == time) { + // we already have set a trigger for that time + return; + } + Set triggers = watermarkTimers.get(time); if (triggers == null) { triggers = new HashSet<>(); watermarkTimers.put(time, triggers); } + this.watermarkTimer = time; triggers.add(this); } + + public Trigger.TriggerResult onProcessingTime(long time) throws Exception { + if (time == processingTimeTimer) { + return trigger.onTime(time, this); + } else { + return Trigger.TriggerResult.CONTINUE; + } + } + + public Trigger.TriggerResult onEventTime(long time) throws Exception { + if (time == watermarkTimer) { + return trigger.onTime(time, this); + } else { + return Trigger.TriggerResult.CONTINUE; + } + } } /** @@ -347,7 +532,7 @@ public class WindowOperator } @Override - public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { + public final void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { if (userFunction instanceof OutputTypeConfigurable) { @SuppressWarnings("unchecked") OutputTypeConfigurable typeConfigurable = (OutputTypeConfigurable) userFunction; @@ -356,6 +541,60 @@ public class WindowOperator } // ------------------------------------------------------------------------ + // Checkpointing + // ------------------------------------------------------------------------ + + @Override + public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { + StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + + // we write the panes with the key/value maps into the stream + StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); + + int numKeys = windows.size(); + out.writeInt(numKeys); + + for (Map.Entry> keyWindows: windows.entrySet()) { + int numWindows = keyWindows.getValue().size(); + out.writeInt(numWindows); + for (Context context: keyWindows.getValue().values()) { + context.writeToState(out); + } + } + + taskState.setOperatorState(out.closeAndGetHandle()); + return taskState; + } + + @Override + public void restoreState(StreamTaskState taskState) throws Exception { + super.restoreState(taskState); + + + @SuppressWarnings("unchecked") + StateHandle inputState = (StateHandle) taskState.getOperatorState(); + DataInputView in = inputState.getState(getUserCodeClassloader()); + + int numKeys = in.readInt(); + this.windows = new HashMap<>(numKeys); + this.processingTimeTimers = new HashMap<>(); + this.watermarkTimers = new HashMap<>(); + + for (int i = 0; i < numKeys; i++) { + int numWindows = in.readInt(); + for (int j = 0; j < numWindows; j++) { + Context context = new Context(in); + Map keyWindows = windows.get(context.key); + if (keyWindows == null) { + keyWindows = new HashMap<>(numWindows); + windows.put(context.key, keyWindows); + } + keyWindows.put(context.window, context); + } + } + } + + // ------------------------------------------------------------------------ // Getters for testing // ------------------------------------------------------------------------ @@ -365,8 +604,8 @@ public class WindowOperator } @VisibleForTesting - public Trigger getTriggerTemplate() { - return triggerTemplate; + public Trigger getTrigger() { + return trigger; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index 4fa16ac..45ef29f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -71,7 +71,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator); NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1; Assert.assertFalse(winOperator1.isSetProcessingTime()); - Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); @@ -94,7 +94,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator); NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2; Assert.assertFalse(winOperator2.isSetProcessingTime()); - Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator2.getTrigger() instanceof WatermarkTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); } @@ -119,7 +119,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator); NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1; Assert.assertTrue(winOperator1.isSetProcessingTime()); - Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger); + Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); @@ -143,7 +143,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator); NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2; Assert.assertTrue(winOperator1.isSetProcessingTime()); - Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger); + Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); } @@ -168,7 +168,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator); EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1; Assert.assertFalse(winOperator1.isSetProcessingTime()); - Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); @@ -194,7 +194,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator); EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2; Assert.assertFalse(winOperator2.isSetProcessingTime()); - Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger); + Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor); Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java index 3139941..39033cc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java @@ -54,6 +54,7 @@ public class EvictingNonKeyedWindowOperatorTest { EvictingNonKeyedWindowOperator, Tuple2, GlobalWindow> operator = new EvictingNonKeyedWindowOperator<>( GlobalWindows.create(), + new GlobalWindow.Serializer(), new HeapWindowBuffer.Factory>(), new ReduceAllWindowFunction>(new SumReducer(closeCalled)), CountTrigger.of(WINDOW_SLIDE), http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java index 3d9605e..afc65d5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java @@ -17,8 +17,10 @@ */ package org.apache.flink.streaming.runtime.operators.windowing; +import jdk.nashorn.internal.objects.Global; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; @@ -54,7 +56,9 @@ public class EvictingWindowOperatorTest { EvictingWindowOperator, Tuple2, GlobalWindow> operator = new EvictingWindowOperator<>( GlobalWindows.create(), + new GlobalWindow.Serializer(), new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new HeapWindowBuffer.Factory>(), new ReduceWindowFunction>(new SumReducer(closeCalled)), CountTrigger.of(WINDOW_SLIDE), http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java index 6cc8931..a91d957 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java @@ -76,6 +76,7 @@ public class NonKeyedWindowOperatorTest { NonKeyedWindowOperator, Tuple2, TimeWindow> operator = new NonKeyedWindowOperator<>( SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), windowBufferFactory, new ReduceAllWindowFunction>(new SumReducer()), WatermarkTrigger.create()); @@ -156,6 +157,7 @@ public class NonKeyedWindowOperatorTest { NonKeyedWindowOperator, Tuple2, TimeWindow> operator = new NonKeyedWindowOperator<>( TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), windowBufferFactory, new ReduceAllWindowFunction>(new SumReducer()), WatermarkTrigger.create()); @@ -234,6 +236,7 @@ public class NonKeyedWindowOperatorTest { NonKeyedWindowOperator, Tuple2, GlobalWindow> operator = new NonKeyedWindowOperator<>( GlobalWindows.create(), + new GlobalWindow.Serializer(), windowBufferFactory, new ReduceAllWindowFunction>(new SumReducer()), ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); @@ -312,6 +315,7 @@ public class NonKeyedWindowOperatorTest { NonKeyedWindowOperator, Tuple2, GlobalWindow> operator = new NonKeyedWindowOperator<>( GlobalWindows.create(), + new GlobalWindow.Serializer(), windowBufferFactory, new ReduceAllWindowFunction>(new SumReducer()), PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE))); http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index d387df0..e825b88 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; @@ -76,7 +77,9 @@ public class WindowOperatorTest { WindowOperator, Tuple2, TimeWindow> operator = new WindowOperator<>( SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowBufferFactory, new ReduceWindowFunction>(new SumReducer()), WatermarkTrigger.create()); @@ -163,7 +166,9 @@ public class WindowOperatorTest { WindowOperator, Tuple2, TimeWindow> operator = new WindowOperator<>( TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowBufferFactory, new ReduceWindowFunction>(new SumReducer()), WatermarkTrigger.create()); @@ -246,7 +251,9 @@ public class WindowOperatorTest { WindowOperator, Tuple2, GlobalWindow> operator = new WindowOperator<>( GlobalWindows.create(), + new GlobalWindow.Serializer(), new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowBufferFactory, new ReduceWindowFunction>(new SumReducer()), ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); @@ -331,7 +338,9 @@ public class WindowOperatorTest { WindowOperator, Tuple2, GlobalWindow> operator = new WindowOperator<>( GlobalWindows.create(), + new GlobalWindow.Serializer(), new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowBufferFactory, new ReduceWindowFunction>(new SumReducer()), PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE))); http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index 10fe734..02ec820 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -116,7 +116,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(operator1 instanceof WindowOperator); WindowOperator winOperator1 = (WindowOperator) operator1; Assert.assertFalse(winOperator1.isSetProcessingTime()); - Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); @@ -140,7 +140,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(operator2 instanceof WindowOperator); WindowOperator winOperator2 = (WindowOperator) operator2; Assert.assertFalse(winOperator2.isSetProcessingTime()); - Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator2.getTrigger() instanceof WatermarkTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); } @@ -166,7 +166,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(operator1 instanceof WindowOperator); WindowOperator winOperator1 = (WindowOperator) operator1; Assert.assertTrue(winOperator1.isSetProcessingTime()); - Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger); + Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); @@ -191,7 +191,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(operator2 instanceof WindowOperator); WindowOperator winOperator2 = (WindowOperator) operator2; Assert.assertTrue(winOperator2.isSetProcessingTime()); - Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger); + Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); } @@ -217,7 +217,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(operator1 instanceof EvictingWindowOperator); EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1; Assert.assertFalse(winOperator1.isSetProcessingTime()); - Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger); + Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor); Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); @@ -244,7 +244,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(operator2 instanceof EvictingWindowOperator); EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2; Assert.assertFalse(winOperator2.isSetProcessingTime()); - Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger); + Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor); Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index 950b0f5..60b7894 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.examples.windowing; +import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -97,8 +98,7 @@ public class SessionWindowing { private static final long serialVersionUID = 1L; - private volatile Long lastSeenEvent = 1L; - private Long sessionTimeout; + private final Long sessionTimeout; public SessionTrigger(Long sessionTimeout) { this.sessionTimeout = sessionTimeout; @@ -106,13 +106,17 @@ public class SessionWindowing { } @Override - public TriggerResult onElement(Tuple3 element, long timestamp, GlobalWindow window, TriggerContext ctx) { - Long timeSinceLastEvent = timestamp - lastSeenEvent; + public TriggerResult onElement(Tuple3 element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception { + + OperatorState lastSeenState = ctx.getKeyValueState("last-seen", 1L); + Long lastSeen = lastSeenState.value(); + + Long timeSinceLastEvent = timestamp - lastSeen; // Update the last seen event time - lastSeenEvent = timestamp; + lastSeenState.update(timestamp); - ctx.registerWatermarkTimer(lastSeenEvent + sessionTimeout); + ctx.registerWatermarkTimer(lastSeen + sessionTimeout); if (timeSinceLastEvent > sessionTimeout) { return TriggerResult.FIRE_AND_PURGE; @@ -122,17 +126,15 @@ public class SessionWindowing { } @Override - public TriggerResult onTime(long time, TriggerContext ctx) { - if (time - lastSeenEvent >= sessionTimeout) { + public TriggerResult onTime(long time, TriggerContext ctx) throws Exception { + OperatorState lastSeenState = ctx.getKeyValueState("last-seen", 1L); + Long lastSeen = lastSeenState.value(); + + if (time - lastSeen >= sessionTimeout) { return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } - - @Override - public SessionTrigger duplicate() { - return new SessionTrigger(sessionTimeout); - } } // ************************************************************************* http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index 33104ab..0357144 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -202,6 +202,58 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { javaStream.apply(applyFunction, implicitly[TypeInformation[R]]) } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def apply[R: TypeInformation: ClassTag]( + preAggregator: ReduceFunction[T], + function: AllWindowFunction[T, R, W]): DataStream[R] = { + javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]]) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def apply[R: TypeInformation: ClassTag]( + preAggregator: (T, T) => T, + function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { + if (function == null) { + throw new NullPointerException("Reduce function must not be null.") + } + if (function == null) { + throw new NullPointerException("WindowApply function must not be null.") + } + + val cleanReducer = clean(preAggregator) + val reducer = new ReduceFunction[T] { + def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) } + } + + val cleanApply = clean(function) + val applyFunction = new AllWindowFunction[T, R, W] { + def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { + cleanApply(window, elements.asScala, out) + } + } + javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]]) + } + // ------------------------------------------------------------------------ // Aggregations on the keyed windows // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index d4f4618..93b91ff 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -196,6 +196,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { */ def apply[R: TypeInformation: ClassTag]( function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { + if (function == null) { + throw new NullPointerException("WindowApply function must not be null.") + } + val cleanedFunction = clean(function) val applyFunction = new WindowFunction[T, R, K, W] { def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { @@ -205,6 +209,58 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { javaStream.apply(applyFunction, implicitly[TypeInformation[R]]) } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def apply[R: TypeInformation: ClassTag]( + preAggregator: ReduceFunction[T], + function: WindowFunction[T, R, K, W]): DataStream[R] = { + javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]]) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def apply[R: TypeInformation: ClassTag]( + preAggregator: (T, T) => T, + function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { + if (function == null) { + throw new NullPointerException("Reduce function must not be null.") + } + if (function == null) { + throw new NullPointerException("WindowApply function must not be null.") + } + + val cleanReducer = clean(preAggregator) + val reducer = new ReduceFunction[T] { + def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) } + } + + val cleanApply = clean(function) + val applyFunction = new WindowFunction[T, R, K, W] { + def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { + cleanApply(key, window, elements.asScala, out) + } + } + javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]]) + } + // ------------------------------------------------------------------------ // Aggregations on the keyed windows // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index 99fcd07..7da7bc3 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -22,8 +22,9 @@ package org.apache.flink.streaming.api.scala import java.util.concurrent.TimeUnit import org.apache.flink.api.common.functions.RichReduceFunction +import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction +import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction} import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows} import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} @@ -111,7 +112,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]]) val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]] - assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) assertTrue( winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) @@ -134,7 +135,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]]) val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]] - assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) } @@ -161,7 +162,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]) val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]] - assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger]) assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]]) assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) @@ -185,11 +186,72 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]) val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]] - assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]]) assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) } + + @Test + def testPreReduce(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val reducer = new DummyReducer + + val window1 = source + .keyBy(0) + .window(SlidingTimeWindows.of( + Time.of(1, TimeUnit.SECONDS), + Time.of(100, TimeUnit.MILLISECONDS))) + .trigger(CountTrigger.of(100)) + .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + def apply( + tuple: Tuple, + window: TimeWindow, + values: java.lang.Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform1 = window1.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]]) + val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]] + assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) + assertTrue( + winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) + + + val window2 = source + .keyBy(0) + .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .trigger(CountTrigger.of(100)) + .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + def apply( + tuple: Tuple, + window: TimeWindow, + values: java.lang.Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform2 = window2.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator2 = transform2.getOperator + + assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]]) + val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]] + assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) + assertTrue( + winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) + } + } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 65f978c..46981ab 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -108,7 +108,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]]) val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]] - assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) assertTrue( winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) @@ -133,7 +133,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]]) val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]] - assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) } @@ -161,7 +161,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]]) val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]] - assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger]) + assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger]) assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]]) assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) @@ -187,9 +187,69 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]]) val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]] - assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]]) assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) } + + @Test + def testPreReduce(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val reducer = new DummyReducer + + val window1 = source + .keyBy(0) + .window(SlidingTimeWindows.of( + Time.of(1, TimeUnit.SECONDS), + Time.of(100, TimeUnit.MILLISECONDS))) + .trigger(CountTrigger.of(100)) + .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + def apply( + tuple: Tuple, + window: TimeWindow, + values: java.lang.Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform1 = window1.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]]) + val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]] + assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) + assertTrue( + winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) + + + val window2 = source + .keyBy(0) + .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .trigger(CountTrigger.of(100)) + .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + def apply( + tuple: Tuple, + window: TimeWindow, + values: java.lang.Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform2 = window2.getJavaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator2 = transform2.getOperator + + assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]]) + val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]] + assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) + assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) + assertTrue( + winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) + } }