From commits-return-347-archive-asf-public=cust-asf.ponee.io@nemo.apache.org Fri Nov 2 04:27:02 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 878F8180652 for ; Fri, 2 Nov 2018 04:27:01 +0100 (CET) Received: (qmail 59162 invoked by uid 500); 2 Nov 2018 03:27:00 -0000 Mailing-List: contact commits-help@nemo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nemo.apache.org Delivered-To: mailing list commits@nemo.apache.org Received: (qmail 59153 invoked by uid 99); 2 Nov 2018 03:27:00 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Nov 2018 03:27:00 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id DB936870B2; Fri, 2 Nov 2018 03:26:59 +0000 (UTC) Date: Fri, 02 Nov 2018 03:26:59 +0000 To: "commits@nemo.apache.org" Subject: [incubator-nemo] branch master updated: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data (#141) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154112921982.14466.530880720636379659@gitbox.apache.org> From: johnyangk@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-nemo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 6e4a317324be11cf10c5a640750dcb539f19945a X-Git-Newrev: a726a5eda3758acfae5d0c6ebee830aecd98a2c3 X-Git-Rev: a726a5eda3758acfae5d0c6ebee830aecd98a2c3 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. johnyangk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git The following commit(s) were added to refs/heads/master by this push: new a726a5e [NEMO-252] Fix CreatViewTransform to emit windowed materialized data (#141) a726a5e is described below commit a726a5eda3758acfae5d0c6ebee830aecd98a2c3 Author: Taegeon Um AuthorDate: Fri Nov 2 12:26:55 2018 +0900 [NEMO-252] Fix CreatViewTransform to emit windowed materialized data (#141) JIRA: [NEMO-252: Fix CreatViewTransform to emit windowed materialized data](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-252) **Major changes:** - Fix `CreateViewTransform` to collect windowed data and emit them by applying a view function **Minor changes** - Fix emitting output watermarks in `GroupByKeyAndWindowDoFnTransform` **Tests for the changes:** - `CreateViewTransformTest` that tests materialized data in windows --- .../compiler/frontend/beam/PipelineTranslator.java | 2 +- .../beam/transform/CreateViewTransform.java | 108 ++++++++++----- .../GroupByKeyAndWindowDoFnTransform.java | 79 ++++++----- .../beam/transform/CreateViewTransformTest.java | 150 +++++++++++++++++++++ 4 files changed, 278 insertions(+), 61 deletions(-) diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java index 7dc7af6..ee10b21 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java @@ -272,7 +272,7 @@ public final class PipelineTranslator private static void createPCollectionViewTranslator(final TranslationContext ctx, final PrimitiveTransformVertex transformVertex, final View.CreatePCollectionView transform) { - final IRVertex vertex = new OperatorVertex(new CreateViewTransform<>(transform.getView())); + final IRVertex vertex = new OperatorVertex(new CreateViewTransform(transform.getView().getViewFn())); ctx.addVertex(vertex); transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input)); ctx.registerMainOutputFrom(vertex, transform.getView()); diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java index d60bcfc..05e5af6 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java @@ -18,37 +18,43 @@ */ package org.apache.nemo.compiler.frontend.beam.transform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.nemo.common.ir.OutputCollector; -import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform; import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; +import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.common.punctuation.Watermark; import javax.annotation.Nullable; import java.io.Serializable; -import java.util.ArrayList; +import java.util.*; /** - * CreateView transform implementation. - * @param input type. - * @param output type. + * This transforms emits materialized data for each window. + * @param input type + * @param materialized output type */ -public final class CreateViewTransform extends NoWatermarkEmitTransform, WindowedValue> { - private final PCollectionView pCollectionView; +public final class CreateViewTransform implements + Transform>, WindowedValue> { private OutputCollector> outputCollector; private final ViewFn, O> viewFn; - private final MultiView multiView; + private final Map> windowListMap; + + // TODO #259: we can remove this variable by implementing ReadyCheckingSideInputReader + private boolean isEmitted = false; + private long currentOutputWatermark; /** * Constructor of CreateViewTransform. - * @param pCollectionView the pCollectionView to create. + * @param viewFn the viewFn that materializes data. */ - public CreateViewTransform(final PCollectionView pCollectionView) { - this.pCollectionView = pCollectionView; - this.viewFn = this.pCollectionView.getViewFn(); - this.multiView = new MultiView<>(); + public CreateViewTransform(final ViewFn, O> viewFn) { + this.viewFn = viewFn; + this.windowListMap = new HashMap<>(); + this.currentOutputWatermark = Long.MIN_VALUE; } @Override @@ -57,23 +63,69 @@ public final class CreateViewTransform extends NoWatermarkEmitTransform element) { - // TODO #216: support window in view - final KV kv = ((WindowedValue) element).getValue(); - multiView.getDataList().add(kv.getValue()); + public void onData(final WindowedValue> element) { + // The key of element is always null (beam's semantic) + // because view is a globally materialized data regardless of key + for (final BoundedWindow window : element.getWindows()) { + windowListMap.putIfAbsent(window, new ArrayList<>()); + final List list = windowListMap.get(window); + list.add(element.getValue().getValue()); + } + } + + @Override + public void onWatermark(final Watermark inputWatermark) { + + // If no data, just forwards the watermark + if (windowListMap.size() == 0 && currentOutputWatermark < inputWatermark.getTimestamp()) { + currentOutputWatermark = inputWatermark.getTimestamp(); + outputCollector.emitWatermark(inputWatermark); + return; + } + + final Iterator>> iterator = windowListMap.entrySet().iterator(); + long minOutputTimestampOfEmittedWindows = Long.MAX_VALUE; + + while (iterator.hasNext()) { + final Map.Entry> entry = iterator.next(); + if (entry.getKey().maxTimestamp().getMillis() <= inputWatermark.getTimestamp()) { + // emit the windowed data if the watermark timestamp > the window max boundary + final O view = viewFn.apply(new MultiView<>(entry.getValue())); + outputCollector.emit(WindowedValue.of( + view, entry.getKey().maxTimestamp(), entry.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING)); + iterator.remove(); + isEmitted = true; + + minOutputTimestampOfEmittedWindows = + Math.min(minOutputTimestampOfEmittedWindows, entry.getKey().maxTimestamp().getMillis()); + } + } + + if (minOutputTimestampOfEmittedWindows != Long.MAX_VALUE + && currentOutputWatermark < minOutputTimestampOfEmittedWindows) { + // update current output watermark and emit to next operators + currentOutputWatermark = minOutputTimestampOfEmittedWindows; + outputCollector.emitWatermark(new Watermark(currentOutputWatermark)); + } } @Override public void close() { - final Object view = viewFn.apply(multiView); - // TODO #216: support window in view - outputCollector.emit(WindowedValue.valueInGlobalWindow((O) view)); + onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); + + if (!isEmitted) { + // TODO #259: This is an ad-hoc code to resolve the view that has no data + // Currently, broadCastWorker reads the view data, but it throws exception if no data is available for a view. + // We should use watermark value to track whether the materialized data in a view is available or not. + final O view = viewFn.apply(new MultiView<>(Collections.emptyList())); + outputCollector.emit(WindowedValue.valueInGlobalWindow(view)); + } } @Override public String toString() { final StringBuilder sb = new StringBuilder(); - sb.append("CreateViewTransform:" + pCollectionView); + sb.append("CreateViewTransform:" + viewFn); return sb.toString(); } @@ -82,23 +134,19 @@ public final class CreateViewTransform extends NoWatermarkEmitTransform primitive view type */ public final class MultiView implements Materializations.MultimapView, Serializable { - private final ArrayList dataList; + private final Iterable iterable; /** * Constructor. */ - MultiView() { + MultiView(final Iterable iterable) { // Create a placeholder for side input data. CreateViewTransform#onData stores data to this list. - dataList = new ArrayList<>(); + this.iterable = iterable; } @Override public Iterable get(@Nullable final Void aVoid) { - return dataList; - } - - public ArrayList getDataList() { - return dataList; + return iterable; } } } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java index 7c44b76..7d20f26 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java @@ -48,6 +48,7 @@ public final class GroupByKeyAndWindowDoFnTransform private final Map>> keyToValues; private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory; private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory; + private long currentOutputWatermark; /** * GroupByKey constructor. @@ -69,6 +70,7 @@ public final class GroupByKeyAndWindowDoFnTransform options); this.keyToValues = new HashMap<>(); this.reduceFn = reduceFn; + this.currentOutputWatermark = Long.MIN_VALUE; } /** @@ -113,34 +115,50 @@ public final class GroupByKeyAndWindowDoFnTransform /** * Process the collected data and trigger timers. - * @param watermark current watermark + * @param inputWatermark current input watermark * @param processingTime processing time * @param synchronizedTime synchronized time */ - private void processElementsAndTriggerTimers(final Watermark watermark, + private void processElementsAndTriggerTimers(final Watermark inputWatermark, final Instant processingTime, final Instant synchronizedTime) { - keyToValues.forEach((key, val) -> { + long minOutputTimestampsOfEmittedWindows = Long.MAX_VALUE; + + for (final Map.Entry>> entry : keyToValues.entrySet()) { + final K key = entry.getKey(); + final List> values = entry.getValue(); + // for each key // Process elements - if (!val.isEmpty()) { + if (!values.isEmpty()) { final KeyedWorkItem keyedWorkItem = - KeyedWorkItems.elementsWorkItem(key, val); + KeyedWorkItems.elementsWorkItem(key, values); // The DoFnRunner interface requires WindowedValue, // but this windowed value is actually not used in the ReduceFnRunner internal. getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem)); } // Trigger timers - triggerTimers(key, watermark, processingTime, synchronizedTime); + final long minOutputTimestamp = + triggerTimers(key, inputWatermark, processingTime, synchronizedTime); + + minOutputTimestampsOfEmittedWindows = Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp); + // Remove values - val.clear(); - }); + values.clear(); + } + + // Emit watermark to downstream operators + if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE + && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) { + currentOutputWatermark = minOutputTimestampsOfEmittedWindows; + getOutputCollector().emitWatermark(new Watermark(minOutputTimestampsOfEmittedWindows)); + } } @Override - public void onWatermark(final Watermark watermark) { - processElementsAndTriggerTimers(watermark, Instant.now(), Instant.now()); + public void onWatermark(final Watermark inputWatermark) { + processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now()); } /** @@ -161,8 +179,10 @@ public final class GroupByKeyAndWindowDoFnTransform * @param watermark watermark * @param processingTime processing time * @param synchronizedTime synchronized time + * @return the minimum output timestamp. + * If no data is emitted, it returns Long.MAX_VALUE. */ - private void triggerTimers(final K key, + private long triggerTimers(final K key, final Watermark watermark, final Instant processingTime, final Instant synchronizedTime) { @@ -179,28 +199,27 @@ public final class GroupByKeyAndWindowDoFnTransform final List timerDataList = getEligibleTimers(timerInternals); if (timerDataList.isEmpty()) { - return; - } + return Long.MAX_VALUE; + } else { - // Trigger timers and emit windowed data - final KeyedWorkItem timerWorkItem = - KeyedWorkItems.timersWorkItem(key, timerDataList); - // The DoFnRunner interface requires WindowedValue, - // but this windowed value is actually not used in the ReduceFnRunner internal. - getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem)); - - // output watermark - // we set output watermark to the minimum of the timer data - long outputWatermark = Long.MAX_VALUE; - for (final TimerInternals.TimerData timer : timerDataList) { - if (outputWatermark > timer.getTimestamp().getMillis()) { - outputWatermark = timer.getTimestamp().getMillis(); + // Trigger timers and emit windowed data + final KeyedWorkItem timerWorkItem = + KeyedWorkItems.timersWorkItem(key, timerDataList); + // The DoFnRunner interface requires WindowedValue, + // but this windowed value is actually not used in the ReduceFnRunner internal. + getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem)); + + // output watermark + // we set output watermark to the minimum of the timer data + long keyOutputTimestamp = Long.MAX_VALUE; + for (final TimerInternals.TimerData timer : timerDataList) { + keyOutputTimestamp = Math.min(keyOutputTimestamp, timer.getTimestamp().getMillis()); } - } - // Emit watermark to downstream operators - timerInternals.advanceOutputWatermark(new Instant(outputWatermark)); - getOutputCollector().emitWatermark(new Watermark(outputWatermark)); + timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp)); + + return keyOutputTimestamp; + } } @Override diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java new file mode 100644 index 0000000..762e327 --- /dev/null +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.sdk.transforms.Materialization; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.common.punctuation.Watermark; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public final class CreateViewTransformTest { + + // [---- window1 --------] [--------------- window2 ---------------] + // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7 + // (null, "hello") + // (null, "world") + // (null, "hello") + // ==> window1: {3} (calculate # of elements) + // (null, "a") + // (null,"a") + // (null,"a") + // (null,"b") + // => window2: {4} (calculate # of elements) + @Test + @SuppressWarnings("unchecked") + public void test() { + + final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1)); + final CreateViewTransform viewTransform = + new CreateViewTransform(new SumViewFn()); + + final Instant ts1 = new Instant(1); + final Instant ts2 = new Instant(100); + final Instant ts3 = new Instant(300); + final Watermark watermark = new Watermark(1003); + final Instant ts4 = new Instant(1200); + final Watermark watermark2 = new Watermark(1400); + final Instant ts5 = new Instant(1600); + final Instant ts6 = new Instant(1800); + final Instant ts7 = new Instant(1900); + final Watermark watermark3 = new Watermark(2100); + + + final Transform.Context context = mock(Transform.Context.class); + final TestOutputCollector oc = new TestOutputCollector(); + viewTransform.prepare(context, oc); + + viewTransform.onData(WindowedValue.of( + KV.of(null, "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING)); + + viewTransform.onData(WindowedValue.of( + KV.of(null, "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING)); + + viewTransform.onData(WindowedValue.of( + KV.of(null, "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING)); + + viewTransform.onWatermark(watermark); + + // materialized data + assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), oc.outputs.get(0).getWindows()); + assertEquals(new Integer(3), oc.outputs.get(0).getValue()); + + // check output watermark + assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(), + oc.watermarks.get(0).getTimestamp()); + + oc.outputs.clear(); + oc.watermarks.clear(); + + + viewTransform.onData(WindowedValue.of( + KV.of(null, "a"), ts4, fixedwindows.assignWindow(ts4), PaneInfo.NO_FIRING)); + + // do not emit anything + viewTransform.onWatermark(watermark2); + assertEquals(0, oc.outputs.size()); + assertEquals(0, oc.watermarks.size()); + + viewTransform.onData(WindowedValue.of( + KV.of(null, "a"), ts5, fixedwindows.assignWindow(ts5), PaneInfo.NO_FIRING)); + + viewTransform.onData(WindowedValue.of( + KV.of(null, "a"), ts6, fixedwindows.assignWindow(ts6), PaneInfo.NO_FIRING)); + + viewTransform.onData(WindowedValue.of( + KV.of(null, "b"), ts7, fixedwindows.assignWindow(ts7), PaneInfo.NO_FIRING)); + + // emit windowed value + viewTransform.onWatermark(watermark3); + + // materialized data + assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows()); + assertEquals(new Integer(4), oc.outputs.get(0).getValue()); + + // check output watermark + assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(), + oc.watermarks.get(0).getTimestamp()); + + oc.outputs.clear(); + + viewTransform.close(); + assertEquals(0, oc.outputs.size()); + } + + final class SumViewFn extends ViewFn, Integer> { + + @Override + public Materialization> getMaterialization() { + throw new UnsupportedOperationException(); + } + + @Override + public Integer apply(final Materializations.MultimapView view) { + int sum = 0; + // MultimapView.get is Nullable + for (String s : view.get(null)) { + sum += 1; + } + return sum; + } + } +}