From commits-return-362-archive-asf-public=cust-asf.ponee.io@nemo.apache.org Mon Nov 12 08:05:25 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D0DFD180660 for ; Mon, 12 Nov 2018 08:05:24 +0100 (CET) Received: (qmail 49745 invoked by uid 500); 12 Nov 2018 07:05:23 -0000 Mailing-List: contact commits-help@nemo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nemo.apache.org Delivered-To: mailing list commits@nemo.apache.org Received: (qmail 49736 invoked by uid 99); 12 Nov 2018 07:05:23 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Nov 2018 07:05:23 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 6031F829C4; Mon, 12 Nov 2018 07:05:23 +0000 (UTC) Date: Mon, 12 Nov 2018 07:05:23 +0000 To: "commits@nemo.apache.org" Subject: [incubator-nemo] branch master updated: [NEMO-279] Finish BeamUnboundedSource when emitting TIMESTAMP_MAX_VALUE (#156) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154200632331.22019.17644070406582960189@gitbox.apache.org> From: johnyangk@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-nemo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: dbad69d4df40cc6dc64087526822e051de09aadf X-Git-Newrev: c5ef7ed5874d3ea6e662dde7a917186a8b80a76b X-Git-Rev: c5ef7ed5874d3ea6e662dde7a917186a8b80a76b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. johnyangk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git The following commit(s) were added to refs/heads/master by this push: new c5ef7ed [NEMO-279] Finish BeamUnboundedSource when emitting TIMESTAMP_MAX_VALUE (#156) c5ef7ed is described below commit c5ef7ed5874d3ea6e662dde7a917186a8b80a76b Author: Taegeon Um AuthorDate: Mon Nov 12 16:05:17 2018 +0900 [NEMO-279] Finish BeamUnboundedSource when emitting TIMESTAMP_MAX_VALUE (#156) JIRA: [NEMO-279: Finish BeamUnboundedSource when emitting TIMESTAMP_MAX_VALUE](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-279) **Major changes:** - Set `isFinished` true in `BeamUnboundedSourceVertex` when emitting `TIMESTAMP_MAX_VALUE` in watermark. --- .../frontend/beam/source/BeamUnboundedSourceVertex.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java index 482dd9d..ad40d1b 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java @@ -20,10 +20,12 @@ package org.apache.nemo.compiler.frontend.beam.source; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.nemo.common.ir.Readable; import org.apache.nemo.common.ir.vertex.IRVertex; import org.apache.nemo.common.ir.vertex.SourceVertex; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,6 +103,7 @@ public final class BeamUnboundedSourceVertex reader; private boolean isStarted = false; private boolean isCurrentAvailable = false; + private boolean isFinished = false; UnboundedSourceReadable(final UnboundedSource unboundedSource) { this.unboundedSource = unboundedSource; @@ -138,12 +141,15 @@ public final class BeamUnboundedSourceVertex= GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis()); + return watermark.getMillis(); } @Override public boolean isFinished() { - return false; + return isFinished; } @Override