From commits-return-102541-archive-asf-public=cust-asf.ponee.io@beam.apache.org Tue Jul 9 13:18:30 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 38EA2180677 for ; Tue, 9 Jul 2019 15:18:30 +0200 (CEST) Received: (qmail 3345 invoked by uid 500); 9 Jul 2019 13:18:28 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 3298 invoked by uid 99); 9 Jul 2019 13:18:28 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jul 2019 13:18:28 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 44A3B87AD7; Tue, 9 Jul 2019 13:18:28 +0000 (UTC) Date: Tue, 09 Jul 2019 13:18:33 +0000 To: "commits@beam.apache.org" Subject: [beam] 16/45: Add TODO in Combine translations MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: echauchot@apache.org In-Reply-To: <156267829511.7091.14138832347784517506@gitbox.apache.org> References: <156267829511.7091.14138832347784517506@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: beam X-Git-Refname: refs/heads/spark-runner_structured-streaming X-Git-Reftype: branch X-Git-Rev: 1a85496dd533364f281ec15cf9aa7f8391405823 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190709131828.44A3B87AD7@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 1a85496dd533364f281ec15cf9aa7f8391405823 Author: Etienne Chauchot AuthorDate: Tue May 28 16:57:47 2019 +0200 Add TODO in Combine translations --- .../translation/batch/CombineGloballyTranslatorBatch.java | 1 + .../translation/batch/CombinePerKeyTranslatorBatch.java | 2 ++ 2 files changed, 3 insertions(+) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java index fd66002..53651cf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java @@ -53,6 +53,7 @@ class CombineGloballyTranslatorBatch Dataset> inputDataset = context.getDataset(input); + //TODO merge windows instead of doing unwindow/window to comply with beam model Dataset unWindowedDataset = inputDataset.map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.genericEncoder()); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java index 5ff0048..3d0ee8b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java @@ -52,9 +52,11 @@ class CombinePerKeyTranslatorBatch Dataset>> inputDataset = context.getDataset(input); + //TODO merge windows instead of doing unwindow/window to comply with beam model Dataset> keyedDataset = inputDataset.map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder()); + // TODO change extractKey impl to deal with WindowedVAlue and use it in GBK KeyValueGroupedDataset> groupedDataset = keyedDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());