Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C0FE7200C0E for ; Tue, 17 Jan 2017 18:53:09 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C0478160B46; Tue, 17 Jan 2017 17:53:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 75CC7160B30 for ; Tue, 17 Jan 2017 18:53:08 +0100 (CET) Received: (qmail 92343 invoked by uid 500); 17 Jan 2017 17:53:07 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 92333 invoked by uid 99); 17 Jan 2017 17:53:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jan 2017 17:53:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 897BFDFB0E; Tue, 17 Jan 2017 17:53:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Tue, 17 Jan 2017 17:53:07 -0000 Message-Id: <486645ae441d48fcbae13d78b4320b2d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/7] beam git commit: Removes code for wrapping DoFn as an OldDoFn archived-at: Tue, 17 Jan 2017 17:53:09 -0000 Repository: beam Updated Branches: refs/heads/master eaf4450f2 -> a91571ef9 Removes code for wrapping DoFn as an OldDoFn Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ad5eb066 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ad5eb066 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ad5eb066 Branch: refs/heads/master Commit: ad5eb06619b724236ad0d2a384b8ecf4c610f1e4 Parents: f1ea8f9 Author: Eugene Kirpichov Authored: Fri Dec 9 17:21:40 2016 -0800 Committer: Eugene Kirpichov Committed: Thu Jan 12 12:55:27 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/transforms/DoFnAdapters.java | 150 ---------- .../org/apache/beam/sdk/transforms/OldDoFn.java | 295 +------------------ .../sdk/transforms/reflect/DoFnInvokers.java | 141 +-------- .../transforms/reflect/DoFnInvokersTest.java | 36 --- 4 files changed, 11 insertions(+), 611 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ad5eb066/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index e15b08b..d1c40a6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -18,8 +18,6 @@ package org.apache.beam.sdk.transforms; import java.io.IOException; -import java.util.Collection; -import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn.Context; @@ -38,7 +36,6 @@ import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; import org.joda.time.Instant; @@ -53,18 +50,6 @@ public class DoFnAdapters { /** Should not be instantiated. */ private DoFnAdapters() {} - /** - * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns the class of the - * original {@link DoFn}, otherwise returns {@code fn.getClass()}. - */ - public static Class getDoFnClass(OldDoFn fn) { - if (fn instanceof SimpleDoFnAdapter) { - return ((SimpleDoFnAdapter) fn).fn.getClass(); - } else { - return fn.getClass(); - } - } - /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */ @SuppressWarnings({"unchecked", "rawtypes"}) public static OldDoFn toOldDoFn(DoFn fn) { @@ -76,126 +61,6 @@ public class DoFnAdapters { } } - /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */ - public static OldDoFn.ProcessContext adaptProcessContext( - OldDoFn fn, - final DoFn.ProcessContext c, - final DoFnInvoker.ArgumentProvider extra) { - return fn.new ProcessContext() { - @Override - public InputT element() { - return c.element(); - } - - @Override - public T sideInput(PCollectionView view) { - return c.sideInput(view); - } - - @Override - public Instant timestamp() { - return c.timestamp(); - } - - @Override - public BoundedWindow window() { - return extra.window(); - } - - @Override - public PaneInfo pane() { - return c.pane(); - } - - @Override - public WindowingInternals windowingInternals() { - return extra.windowingInternals(); - } - - @Override - public PipelineOptions getPipelineOptions() { - return c.getPipelineOptions(); - } - - @Override - public void output(OutputT output) { - c.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - c.outputWithTimestamp(output, timestamp); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - c.sideOutput(tag, output); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - c.sideOutputWithTimestamp(tag, output, timestamp); - } - - @Override - protected Aggregator createAggregatorInternal( - String name, CombineFn combiner) { - return c.createAggregator(name, combiner); - } - }; - } - - /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */ - public static OldDoFn.Context adaptContext( - OldDoFn fn, - final DoFn.Context c) { - return fn.new Context() { - @Override - public PipelineOptions getPipelineOptions() { - return c.getPipelineOptions(); - } - - @Override - public void output(OutputT output) { - c.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - c.outputWithTimestamp(output, timestamp); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - c.sideOutput(tag, output); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - c.sideOutputWithTimestamp(tag, output, timestamp); - } - - @Override - protected Aggregator createAggregatorInternal( - String name, CombineFn combiner) { - return c.createAggregator(name, combiner); - } - }; - } - - /** - * If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise, - * returns {@code null}. - */ - @Nullable - public static DoFn getDoFn(OldDoFn fn) { - if (fn instanceof SimpleDoFnAdapter) { - return ((SimpleDoFnAdapter) fn).fn; - } else { - return null; - } - } - /** * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link * OldDoFn}. @@ -238,21 +103,6 @@ public class DoFnAdapters { } @Override - protected TypeDescriptor getInputTypeDescriptor() { - return fn.getInputTypeDescriptor(); - } - - @Override - protected TypeDescriptor getOutputTypeDescriptor() { - return fn.getOutputTypeDescriptor(); - } - - @Override - Collection> getAggregators() { - return fn.getAggregators(); - } - - @Override public Duration getAllowedTimestampSkew() { return fn.getAllowedTimestampSkew(); } http://git-wip-us.apache.org/repos/asf/beam/blob/ad5eb066/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index 2d2c1fd..0aef552 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; import org.joda.time.Instant; @@ -71,21 +70,6 @@ import org.joda.time.Instant; */ @Deprecated public abstract class OldDoFn implements Serializable, HasDisplayData { - - public DoFn toDoFn() { - DoFn doFn = DoFnAdapters.getDoFn(this); - if (doFn != null) { - return doFn; - } - if (this instanceof RequiresWindowAccess) { - // No parameters as it just accesses `this` - return new AdaptedRequiresWindowAccessDoFn(); - } else { - // No parameters as it just accesses `this` - return new AdaptedDoFn(); - } - } - /** * Information accessible to all methods in this {@code OldDoFn}. * Used primarily to output elements. @@ -334,7 +318,7 @@ public abstract class OldDoFn implements Serializable, HasDispl this(new HashMap>()); } - OldDoFn(Map> aggregators) { + public OldDoFn(Map> aggregators) { this.aggregators = aggregators; } @@ -419,32 +403,6 @@ public abstract class OldDoFn implements Serializable, HasDispl ///////////////////////////////////////////////////////////////////////////// /** - * Returns a {@link TypeDescriptor} capturing what is known statically - * about the input type of this {@code OldDoFn} instance's most-derived - * class. - * - *

See {@link #getOutputTypeDescriptor} for more discussion. - */ - protected TypeDescriptor getInputTypeDescriptor() { - return new TypeDescriptor(getClass()) {}; - } - - /** - * Returns a {@link TypeDescriptor} capturing what is known statically - * about the output type of this {@code OldDoFn} instance's - * most-derived class. - * - *

In the normal case of a concrete {@code OldDoFn} subclass with - * no generic type parameters of its own (including anonymous inner - * classes), this will be a complete non-generic type, which is good - * for choosing a default output {@code Coder} for the output - * {@code PCollection}. - */ - protected TypeDescriptor getOutputTypeDescriptor() { - return new TypeDescriptor(getClass()) {}; - } - - /** * Returns an {@link Aggregator} with aggregation logic specified by the * {@link CombineFn} argument. The name provided must be unique across * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created @@ -504,255 +462,4 @@ public abstract class OldDoFn implements Serializable, HasDispl Collection> getAggregators() { return Collections.>unmodifiableCollection(aggregators.values()); } - - /** - * A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}. - */ - private class AdaptedContext extends Context { - - private final DoFn.Context newContext; - - public AdaptedContext( - DoFn.Context newContext) { - this.newContext = newContext; - super.setupDelegateAggregators(); - } - - @Override - public PipelineOptions getPipelineOptions() { - return newContext.getPipelineOptions(); - } - - @Override - public void output(OutputT output) { - newContext.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - newContext.outputWithTimestamp(output, timestamp); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - newContext.sideOutput(tag, output); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - newContext.sideOutputWithTimestamp(tag, output, timestamp); - } - - @Override - protected Aggregator createAggregatorInternal( - String name, CombineFn combiner) { - return newContext.createAggregator(name, combiner); - } - } - - /** - * A {@link ProcessContext} for an {@link OldDoFn} via a context for a proper {@link DoFn}. - */ - private class AdaptedProcessContext extends ProcessContext { - - private final DoFn.ProcessContext newContext; - - public AdaptedProcessContext(DoFn.ProcessContext newContext) { - this.newContext = newContext; - } - - @Override - public InputT element() { - return newContext.element(); - } - - @Override - public T sideInput(PCollectionView view) { - return newContext.sideInput(view); - } - - @Override - public Instant timestamp() { - return newContext.timestamp(); - } - - @Override - public BoundedWindow window() { - throw new UnsupportedOperationException(String.format( - "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s", - OldDoFn.class.getSimpleName(), - OldDoFn.ProcessContext.class.getSimpleName(), - OldDoFn.class.getSimpleName(), - DoFn.class.getSimpleName())); - } - - @Override - public PaneInfo pane() { - return newContext.pane(); - } - - @Override - public WindowingInternals windowingInternals() { - throw new UnsupportedOperationException(String.format( - "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s", - OldDoFn.class.getSimpleName(), - OldDoFn.ProcessContext.class.getSimpleName(), - OldDoFn.class.getSimpleName(), - DoFn.class.getSimpleName())); - } - - @Override - public PipelineOptions getPipelineOptions() { - return newContext.getPipelineOptions(); - } - - @Override - public void output(OutputT output) { - newContext.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - newContext.outputWithTimestamp(output, timestamp); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - newContext.sideOutput(tag, output); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - newContext.sideOutputWithTimestamp(tag, output, timestamp); - } - - @Override - protected Aggregator createAggregatorInternal( - String name, CombineFn combiner) { - return newContext.createAggregator(name, combiner); - } - } - - private class AdaptedDoFn extends DoFn { - - @Setup - public void setup() throws Exception { - OldDoFn.this.setup(); - } - - @StartBundle - public void startBundle(Context c) throws Exception { - OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c)); - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c)); - } - - @FinishBundle - public void finishBundle(Context c) throws Exception { - OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c)); - } - - @Teardown - public void teardown() throws Exception { - OldDoFn.this.teardown(); - } - - @Override - public Duration getAllowedTimestampSkew() { - return OldDoFn.this.getAllowedTimestampSkew(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - OldDoFn.this.populateDisplayData(builder); - } - - @Override - public TypeDescriptor getInputTypeDescriptor() { - return OldDoFn.this.getInputTypeDescriptor(); - } - - @Override - Collection> getAggregators() { - return OldDoFn.this.getAggregators(); - } - - @Override - public TypeDescriptor getOutputTypeDescriptor() { - return OldDoFn.this.getOutputTypeDescriptor(); - } - } - - /** - * A {@link ProcessContext} for an {@link OldDoFn} that implements - * {@link RequiresWindowAccess}, via a context for a proper {@link DoFn}. - */ - private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext { - - private final BoundedWindow window; - - public AdaptedRequiresWindowAccessProcessContext( - DoFn.ProcessContext newContext, - BoundedWindow window) { - super(newContext); - this.window = window; - } - - @Override - public BoundedWindow window() { - return window; - } - } - - private class AdaptedRequiresWindowAccessDoFn extends DoFn { - - @Setup - public void setup() throws Exception { - OldDoFn.this.setup(); - } - - @StartBundle - public void startBundle(Context c) throws Exception { - OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c)); - } - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - OldDoFn.this.processElement( - OldDoFn.this.new AdaptedRequiresWindowAccessProcessContext(c, window)); - } - - @FinishBundle - public void finishBundle(Context c) throws Exception { - OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c)); - } - - @Teardown - public void teardown() throws Exception { - OldDoFn.this.teardown(); - } - - @Override - public Duration getAllowedTimestampSkew() { - return OldDoFn.this.getAllowedTimestampSkew(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - OldDoFn.this.populateDisplayData(builder); - } - - @Override - public TypeDescriptor getInputTypeDescriptor() { - return OldDoFn.this.getInputTypeDescriptor(); - } - - @Override - public TypeDescriptor getOutputTypeDescriptor() { - return OldDoFn.this.getOutputTypeDescriptor(); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/ad5eb066/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index 50a7082..b141d51 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -18,13 +18,7 @@ package org.apache.beam.sdk.transforms.reflect; import java.io.Serializable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.util.UserCodeException; /** Static utilities for working with {@link DoFnInvoker}. */ public class DoFnInvokers { @@ -42,137 +36,22 @@ public class DoFnInvokers { return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn); } - private DoFnInvokers() {} - /** - * Returns a {@link DoFnInvoker} for the given {@link Object}, which should be either a {@link - * DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's function as an - * {@link Object} and then pass it to this method, so there is no need to statically specify what - * sort of object it is. + * Temporarily retained for compatibility with Dataflow worker. + * TODO: delete this when Dataflow worker is fixed to call {@link #invokerFor(DoFn)}. * - * @deprecated this is to be used only as a migration path for decoupling upgrades + * @deprecated Use {@link #invokerFor(DoFn)}. */ + @SuppressWarnings("unchecked") @Deprecated - public static DoFnInvoker invokerFor(Serializable deserializedFn) { + public static DoFnInvoker invokerFor( + Serializable deserializedFn) { if (deserializedFn instanceof DoFn) { - return invokerFor((DoFn) deserializedFn); - } else if (deserializedFn instanceof OldDoFn) { - return new OldDoFnInvoker<>((OldDoFn) deserializedFn); - } else { - throw new IllegalArgumentException( - String.format( - "Cannot create a %s for %s; it should be either a %s or an %s.", - DoFnInvoker.class.getSimpleName(), - deserializedFn.toString(), - DoFn.class.getSimpleName(), - OldDoFn.class.getSimpleName())); + return invokerFor((DoFn) deserializedFn); } + throw new UnsupportedOperationException( + "Only DoFn supported, was: " + deserializedFn.getClass()); } - /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */ - @Deprecated public static final DoFnInvokers INSTANCE = new DoFnInvokers(); - - /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */ - @Deprecated - public DoFnInvoker invokerFor(Object deserializedFn) { - return (DoFnInvoker) DoFnInvokers.invokerFor((Serializable) deserializedFn); - } - - - static class OldDoFnInvoker implements DoFnInvoker { - - private final OldDoFn fn; - - public OldDoFnInvoker(OldDoFn fn) { - this.fn = fn; - } - - @Override - public DoFn.ProcessContinuation invokeProcessElement( - ArgumentProvider extra) { - // The outer DoFn is immaterial - it exists only to avoid typing InputT and OutputT repeatedly - DoFn.ProcessContext newCtx = - extra.processContext(new DoFn() {}); - OldDoFn.ProcessContext oldCtx = - DoFnAdapters.adaptProcessContext(fn, newCtx, extra); - try { - fn.processElement(oldCtx); - return DoFn.ProcessContinuation.stop(); - } catch (Throwable exc) { - throw UserCodeException.wrap(exc); - } - } - - @Override - public void invokeOnTimer(String timerId, ArgumentProvider arguments) { - throw new UnsupportedOperationException( - String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName())); - } - - @Override - public void invokeStartBundle(DoFn.Context c) { - OldDoFn.Context oldCtx = DoFnAdapters.adaptContext(fn, c); - try { - fn.startBundle(oldCtx); - } catch (Throwable exc) { - throw UserCodeException.wrap(exc); - } - } - - @Override - public void invokeFinishBundle(DoFn.Context c) { - OldDoFn.Context oldCtx = DoFnAdapters.adaptContext(fn, c); - try { - fn.finishBundle(oldCtx); - } catch (Throwable exc) { - throw UserCodeException.wrap(exc); - } - } - - @Override - public void invokeSetup() { - try { - fn.setup(); - } catch (Throwable exc) { - throw UserCodeException.wrap(exc); - } - } - - @Override - public void invokeTeardown() { - try { - fn.teardown(); - } catch (Throwable exc) { - throw UserCodeException.wrap(exc); - } - } - - @Override - public RestrictionT invokeGetInitialRestriction(InputT element) { - throw new UnsupportedOperationException("OldDoFn is not splittable"); - } - - @Override - public Coder invokeGetRestrictionCoder( - CoderRegistry coderRegistry) { - throw new UnsupportedOperationException("OldDoFn is not splittable"); - } - - @Override - public void invokeSplitRestriction( - InputT element, RestrictionT restriction, DoFn.OutputReceiver receiver) { - throw new UnsupportedOperationException("OldDoFn is not splittable"); - } - - @Override - public > - TrackerT invokeNewTracker(RestrictionT restriction) { - throw new UnsupportedOperationException("OldDoFn is not splittable"); - } - - @Override - public DoFn getFn() { - throw new UnsupportedOperationException("getFn is not supported for OldDoFn"); - } - } + private DoFnInvokers() {} } http://git-wip-us.apache.org/repos/asf/beam/blob/ad5eb066/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 456a6eb..55b8e7e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -738,39 +737,4 @@ public class DoFnInvokersTest { invoker.invokeOnTimer(timerId, mockArgumentProvider); assertThat(fn.window, equalTo(testWindow)); } - - private class OldDoFnIdentity extends OldDoFn { - public void processElement(ProcessContext c) {} - } - - @Test - public void testOldDoFnProcessElement() throws Exception { - new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn) - .invokeProcessElement(mockArgumentProvider); - verify(mockOldDoFn).processElement(any(OldDoFn.ProcessContext.class)); - } - - @Test - public void testOldDoFnStartBundle() throws Exception { - new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeStartBundle(mockProcessContext); - verify(mockOldDoFn).startBundle(any(OldDoFn.Context.class)); - } - - @Test - public void testOldDoFnFinishBundle() throws Exception { - new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeFinishBundle(mockProcessContext); - verify(mockOldDoFn).finishBundle(any(OldDoFn.Context.class)); - } - - @Test - public void testOldDoFnSetup() throws Exception { - new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeSetup(); - verify(mockOldDoFn).setup(); - } - - @Test - public void testOldDoFnTeardown() throws Exception { - new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeTeardown(); - verify(mockOldDoFn).teardown(); - } }