Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E170E200BAD for ; Tue, 25 Oct 2016 18:32:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E00E8160B02; Tue, 25 Oct 2016 16:32:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0A427160B03 for ; Tue, 25 Oct 2016 18:32:07 +0200 (CEST) Received: (qmail 6473 invoked by uid 500); 25 Oct 2016 16:32:07 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 6264 invoked by uid 99); 25 Oct 2016 16:32:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Oct 2016 16:32:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id AC5E9C0C95 for ; Tue, 25 Oct 2016 16:32:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id It6DgOUeBOCD for ; Tue, 25 Oct 2016 16:32:05 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id AB24360DC5 for ; Tue, 25 Oct 2016 16:32:04 +0000 (UTC) Received: (qmail 94097 invoked by uid 99); 25 Oct 2016 16:28:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Oct 2016 16:28:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8A87FF0DBF; Tue, 25 Oct 2016 16:28:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: thw@apache.org To: commits@beam.incubator.apache.org Date: Tue, 25 Oct 2016 16:28:23 -0000 Message-Id: <5241d897c71543ed8f4804800ffb4e7e@git.apache.org> In-Reply-To: <43bd88bf5c0846b19987616570ecb213@git.apache.org> References: <43bd88bf5c0846b19987616570ecb213@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/50] incubator-beam git commit: Add OldDoFn.toDoFn() adapter archived-at: Tue, 25 Oct 2016 16:32:09 -0000 Add OldDoFn.toDoFn() adapter Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44878e57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44878e57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44878e57 Branch: refs/heads/apex-runner Commit: 44878e57eafc3ee6c437b8537dd1f88024006a51 Parents: fe0b7bf Author: Kenneth Knowles Authored: Wed Oct 19 20:44:52 2016 -0700 Committer: Kenneth Knowles Committed: Thu Oct 20 18:32:06 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/OldDoFn.java | 183 +++++++++++++++++++ 1 file changed, 183 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44878e57/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index a445c7d..912bf24 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -75,6 +75,23 @@ import org.joda.time.Instant; @Deprecated public abstract class OldDoFn implements Serializable, HasDisplayData { + public DoFn toDoFn() { + if (this instanceof RequiresWindowAccess) { + throw new UnsupportedOperationException( + String.format( + "Cannot convert %s to %s because it implements %s." + + " Please convert your %s to a %s directly.", + getClass(), + DoFn.class.getSimpleName(), + RequiresWindowAccess.class.getSimpleName(), + OldDoFn.class.getSimpleName(), + DoFn.class.getSimpleName())); + } + + // No parameters as it just accesses `this` + return new AdaptedDoFn(); + } + /** * Information accessible to all methods in this {@code OldDoFn}. * Used primarily to output elements. @@ -587,4 +604,170 @@ public abstract class OldDoFn implements Serializable, HasDispl return false; } } + + /** + * A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}. + */ + private class AdaptedContext extends Context { + + private final DoFn.Context newContext; + + public AdaptedContext( + DoFn.Context newContext) { + this.newContext = newContext; + } + + @Override + public PipelineOptions getPipelineOptions() { + return newContext.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + newContext.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + newContext.outputWithTimestamp(output, timestamp); + } + + @Override + public void sideOutput(TupleTag tag, T output) { + newContext.sideOutput(tag, output); + } + + @Override + public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + newContext.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected Aggregator createAggregatorInternal( + String name, CombineFn combiner) { + return null; + } + } + + /** + * A {@link ProcessContext} for an {@link OldDoFn} via a context for a proper {@link DoFn}. + */ + private class AdaptedProcessContext extends ProcessContext { + + private final DoFn.ProcessContext newContext; + + public AdaptedProcessContext( + DoFn.ProcessContext newContext) { + this.newContext = newContext; + } + + @Override + public InputT element() { + return newContext.element(); + } + + @Override + public T sideInput(PCollectionView view) { + return newContext.sideInput(view); + } + + @Override + public Instant timestamp() { + return newContext.timestamp(); + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException(String.format( + "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s", + OldDoFn.class.getSimpleName(), + OldDoFn.ProcessContext.class.getSimpleName(), + OldDoFn.class.getSimpleName(), + DoFn.class.getSimpleName())); + } + + @Override + public PaneInfo pane() { + return newContext.pane(); + } + + @Override + public WindowingInternals windowingInternals() { + throw new UnsupportedOperationException(String.format( + "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s", + OldDoFn.class.getSimpleName(), + OldDoFn.ProcessContext.class.getSimpleName(), + OldDoFn.class.getSimpleName(), + DoFn.class.getSimpleName())); + } + + @Override + public PipelineOptions getPipelineOptions() { + return newContext.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + newContext.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + newContext.outputWithTimestamp(output, timestamp); + } + + @Override + public void sideOutput(TupleTag tag, T output) { + newContext.sideOutput(tag, output); + } + + @Override + public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + newContext.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected Aggregator createAggregatorInternal( + String name, CombineFn combiner) { + return null; + } + } + + private class AdaptedDoFn extends DoFn { + + @StartBundle + public void startBundle(DoFn.Context c) throws Exception { + OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c)); + } + + @ProcessElement + public void processElement(DoFn.ProcessContext c) throws Exception { + OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c)); + } + + @FinishBundle + public void finishBundle(DoFn.Context c) throws Exception { + OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c)); + } + + @Override + public Duration getAllowedTimestampSkew() { + return OldDoFn.this.getAllowedTimestampSkew(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + OldDoFn.this.populateDisplayData(builder); + } + + @Override + protected TypeDescriptor getInputTypeDescriptor() { + return OldDoFn.this.getInputTypeDescriptor(); + } + + @Override + protected TypeDescriptor getOutputTypeDescriptor() { + return OldDoFn.this.getOutputTypeDescriptor(); + } + } }